Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to check if connection is alive in BlockSync/BlockRanngSync #46

Merged
merged 3 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ public void blockFound(Block block) {

blockFetcher.fetch(from, to);

int aliveCount = 0;
while (true) {
aliveCount++;
if (aliveCount % 10 == 0) {
int random =(int) Math.random()*(65000-0+1)+0;
blockFetcher.sendKeepAliveMessage(random);
}
int min = 1;
int max = 65000;
int randomNum = (int)(Math.random() * (max - min + 1)) + min;
blockFetcher.sendKeepAliveMessage(randomNum);

System.out.println("Last Keep Alive Message Time : " + blockFetcher.getLastKeepAliveResponseTime());
System.out.println("Last Keep Alive Message Cookie : " + blockFetcher.getLastKeepAliveResponseCookie());

Thread.sleep(1000);
Thread.sleep(3000);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.bloxbean.cardano.yaci.helper;

import com.bloxbean.cardano.yaci.core.common.Constants;
import com.bloxbean.cardano.yaci.core.model.Block;
import com.bloxbean.cardano.yaci.core.model.Era;
import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Point;
import com.bloxbean.cardano.yaci.helper.listener.BlockChainDataListener;
import com.bloxbean.cardano.yaci.helper.model.Transaction;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -40,4 +43,46 @@ public void onBlock(Era era, Block block, List<Transaction> transactions) {
assertThat(blocks.get(0).getHeader().getHeaderBody().getSlot()).isEqualTo(13107194);
assertThat(blocks.get(2).getHeader().getHeaderBody().getSlot()).isEqualTo(13107220);
}

@Test
@Disabled
void fetch_tillTip() throws InterruptedException {
BlockRangeSync blockRangeSync = new BlockRangeSync(node, nodePort, protocolMagic);
AtomicInteger blockCount = new AtomicInteger(0);
blockRangeSync.start(new BlockChainDataListener() {
@Override
public void onBlock(Era era, Block block, List<Transaction> transactions) {
int count = blockCount.incrementAndGet();
if (count % 1000 == 0)
System.out.println("Block: " + block.getHeader().getHeaderBody().getBlockNumber());
}
});

Point from = null;
Point to = null;
if (protocolMagic == Constants.SANCHONET_PROTOCOL_MAGIC) {
from = new Point(60, "8f4e50c397cf0796e6ac9b6db9fc0b761a29f1a040a7f1cfaa35513e3cc4db38");
to = new Point(19397676, "804046ba432b676895198d2dc9ae8f0f842f7dd74b8aba71f12dc98594548361");
} else if (protocolMagic == Constants.PREPROD_PROTOCOL_MAGIC) {
from = new Point(2, "1d031daf47281f69cd95ab929c269fd26b1434a56a5bbbd65b7afe85ef96b233");
to = new Point(50468813, "2fb2554a9fec38ce4b8121c001087f867b1bd19cda11e93dc5475dc253baf0e9");
} else if (protocolMagic == Constants.MAINNET_PROTOCOL_MAGIC) {
from = new Point(1, "1dbc81e3196ba4ab9dcb07e1c37bb28ae1c289c0707061f28b567c2f48698d50");
to = new Point(114620634, "fc1e525bd6406a1bf01b2423ea761336546ff14fc5bb3c4b711b60f57ae143a4");
}

blockRangeSync.fetch(from, to);

while (true) {
int min = 1;
int max = 65000;
int randomNum = (int)(Math.random() * (max - min + 1)) + min;
blockRangeSync.sendKeepAliveMessage(randomNum);

System.out.println("Last Keep Alive Message Time : " + blockRangeSync.getLastKeepAliveResponseTime());
System.out.println("Last Keep Alive Message Cookie : " + blockRangeSync.getLastKeepAliveResponseCookie());

Thread.sleep(2000);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Tip;
import com.bloxbean.cardano.yaci.helper.listener.BlockChainDataListener;
import com.bloxbean.cardano.yaci.helper.model.Transaction;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.List;
Expand Down Expand Up @@ -41,6 +42,36 @@ public void onBlock(Era era, Block block, List<Transaction> transactions) {
assertThat(blockNo.get()).isGreaterThan(420800);
}

@Test
@Disabled
void syncFromTip_dontStop() throws InterruptedException {
BlockSync blockSync = new BlockSync(node, nodePort, protocolMagic, Constants.WELL_KNOWN_PREPROD_POINT);

blockSync.startSyncFromTip(new BlockChainDataListener() {

public void onBlock(Era era, Block block, List<Transaction> transactions) {
System.out.println(block.getHeader().getHeaderBody().getBlockNumber());
System.out.println("# of transactions >> " + transactions.size());
}

});

int aliveCount = 0;
while (true) {
aliveCount++;
if (aliveCount % 10 == 0) {
int min = 1;
int max = 65000;
int randomNum = (int)(Math.random() * (max - min + 1)) + min;
blockSync.sendKeepAliveMessage(randomNum);
}

System.out.println("Last Keep Alive Message Time : " + blockSync.getLastKeepAliveResponseTime());
System.out.println("Last Keep Alive Message Cookie : " + blockSync.getLastKeepAliveResponseCookie());
Thread.sleep(2000);
}
}

@Test
void syncFromPoint() throws InterruptedException {
BlockSync blockSync = new BlockSync(node, nodePort, protocolMagic, Constants.WELL_KNOWN_PREPROD_POINT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class BlockFetcher implements Fetcher<Block> {
private BlockfetchAgent blockfetchAgent;
private TCPNodeClient n2nClient;

private int lastKeepAliveResponseCookie = 0;
private long lastKeepAliveResponseTime = 0;

/**
* Constructor to create BlockFetcher instance
* @param host Cardano node host
Expand Down Expand Up @@ -91,6 +94,11 @@ public void handshakeOk() {
keepAliveAgent.sendKeepAlive(1234);
}
});

keepAliveAgent.addListener(response -> {
lastKeepAliveResponseCookie = response.getCookie();
lastKeepAliveResponseTime = System.currentTimeMillis();
});
}

/**
Expand Down Expand Up @@ -162,18 +170,20 @@ public void sendKeepAliveMessage(int cookie) {
keepAliveAgent.sendKeepAlive(cookie);
}

// public static void main(String[] args) {
// //shelley
// Point from = new Point(16588737, "4e9bbbb67e3ae262133d94c3da5bffce7b1127fc436e7433b87668dba34c354a");
// Point to = new Point(70223766, "21155bb822637508a91e9952e712040c0ea45107fb91898bfe8c9a95389b0d90");
//
// VersionTable versionTable = N2NVersionTableConstant.v4AndAbove(Networks.mainnet().getProtocolMagic());
// BlockFetcher blockFetcher = new BlockFetcher("192.168.0.228", 6000, versionTable);
//
// blockFetcher.start(block -> {
// log.info("Block >>> {} -- {} {}", block.getHeader().getHeaderBody().getBlockNumber(), block.getHeader().getHeaderBody().getSlot() + " ", block.getEra());
// });
//
// blockFetcher.fetch(from, to);
// }
/**
* Get the last keep alive response cookie
* @return
*/
public int getLastKeepAliveResponseCookie() {
return lastKeepAliveResponseCookie;
}

/**
* Get the last keep alive response time
* @return
*/
public long getLastKeepAliveResponseTime() {
return lastKeepAliveResponseTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,41 @@ public void fetch(Point from, Point to) {
blockFetcher.fetch(from, to);
}

/**
* Send keep alive message
* @param cookie
*/
public void sendKeepAliveMessage(int cookie) {
blockFetcher.sendKeepAliveMessage(cookie);
}

/**
* Get the last keep alive response cookie
* @return
*/
public int getLastKeepAliveResponseCookie() {
return blockFetcher.getLastKeepAliveResponseCookie();
}

/**
* Get the last keep alive response time
* @return
*/
public long getLastKeepAliveResponseTime() {
return blockFetcher.getLastKeepAliveResponseTime();
}

/**
* Stop the fetcher
*/
public void stop() {
blockFetcher.shutdown();
}

/**
* Check if the connection is alive
*/
public boolean isRunning() {
return blockFetcher.isRunning();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,42 @@ public void startSyncFromTip(BlockChainDataListener blockChainDataListener) {
initializeAgentAndStart(wellKnownPoint, blockChainDataListener, true);
}

/**
* Send keep alive message
* @param cookie
*/
public void sendKeepAliveMessage(int cookie) {
if (n2NChainSyncFetcher.isRunning())
n2NChainSyncFetcher.sendKeepAliveMessage(cookie);
}

/**
* Get the last keep alive response cookie
* @return
*/
public int getLastKeepAliveResponseCookie() {
return n2NChainSyncFetcher.getLastKeepAliveResponseCookie();
}

/**
* Get the last keep alive response time
* @return
*/
public long getLastKeepAliveResponseTime() {
return n2NChainSyncFetcher.getLastKeepAliveResponseTime();
}

/**
* Stop the fetcher
*/
public void stop() {
n2NChainSyncFetcher.shutdown();
}

/**
* Check if the connection is alive
*/
public boolean isRunning() {
return n2NChainSyncFetcher.isRunning();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class N2NChainSyncFetcher implements Fetcher<Block> {
private BlockfetchAgent blockFetchAgent;
private TCPNodeClient n2nClient;

private int lastKeepAliveResponseCookie = 0;
private long lastKeepAliveResponseTime = 0;

/**
* Construct {@link N2NChainSyncFetcher} to sync the blockchain
*
Expand Down Expand Up @@ -193,6 +196,11 @@ public void byronEbBlockFound(ByronEbBlock byronEbBlock) {
}
});

keepAliveAgent.addListener(response -> {
lastKeepAliveResponseCookie = response.getCookie();
lastKeepAliveResponseTime = System.currentTimeMillis();
});

n2nClient = new TCPNodeClient(host, port, handshakeAgent, keepAliveAgent,
chainSyncAgent, blockFetchAgent);
}
Expand Down Expand Up @@ -253,11 +261,31 @@ public void addChainSyncListener(ChainSyncAgentListener listener) {
chainSyncAgent.addListener(listener);
}

/**
* Send keep alive message
* @param cookie
*/
public void sendKeepAliveMessage(int cookie) {
if (n2nClient.isRunning())
keepAliveAgent.sendKeepAlive(cookie);
}

/**
* Get the last keep alive response cookie
* @return
*/
public int getLastKeepAliveResponseCookie() {
return lastKeepAliveResponseCookie;
}

/**
* Get the last keep alive response time
* @return
*/
public long getLastKeepAliveResponseTime() {
return lastKeepAliveResponseTime;
}

/**
* Check if the connection is alive
*
Expand Down
Loading