From 43a7505707ddaacf603a54be1dd04eb69f2a9492 Mon Sep 17 00:00:00 2001 From: sotatek-huyle3 Date: Mon, 5 Feb 2024 10:10:27 +0700 Subject: [PATCH 1/2] feat: #45 Add method to check if connection is alive in BlockSync/BlockRangeSync --- .../com/bloxbean/cardano/yaci/helper/BlockRangeSync.java | 7 +++++++ .../java/com/bloxbean/cardano/yaci/helper/BlockSync.java | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java index df3c34c..4afafc5 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java @@ -71,4 +71,11 @@ public void sendKeepAliveMessage(int cookie) { public void stop() { blockFetcher.shutdown(); } + + /** + * Check if the connection is alive + */ + public boolean isRunning() { + return blockFetcher.isRunning(); + } } diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java index b822f35..c2c6210 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java @@ -91,4 +91,10 @@ public void stop() { n2NChainSyncFetcher.shutdown(); } + /** + * Check if the connection is alive + */ + public boolean isRunning() { + return n2NChainSyncFetcher.isRunning(); + } } From 821279bc7e1b40f4d03b9e20fc46f9a624f3d042 Mon Sep 17 00:00:00 2001 From: Satya Date: Mon, 5 Feb 2024 13:25:47 +0800 Subject: [PATCH 2/2] feat: #45 Add method to return last keep alive message and receive time --- .../cardano/yaci/helper/BlockFetcherIT.java | 15 ++++--- .../cardano/yaci/helper/BlockRangeSyncIT.java | 45 +++++++++++++++++++ .../cardano/yaci/helper/BlockSyncIT.java | 31 +++++++++++++ .../cardano/yaci/helper/BlockFetcher.java | 38 ++++++++++------ .../cardano/yaci/helper/BlockRangeSync.java | 20 +++++++++ .../cardano/yaci/helper/BlockSync.java | 20 +++++++++ .../yaci/helper/N2NChainSyncFetcher.java | 28 ++++++++++++ 7 files changed, 176 insertions(+), 21 deletions(-) diff --git a/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockFetcherIT.java b/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockFetcherIT.java index e231bf9..4dbb84d 100644 --- a/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockFetcherIT.java +++ b/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockFetcherIT.java @@ -92,15 +92,16 @@ public void blockFound(Block block) { blockFetcher.fetch(from, to); - int aliveCount = 0; while (true) { - aliveCount++; - if (aliveCount % 10 == 0) { - int random =(int) Math.random()*(65000-0+1)+0; - blockFetcher.sendKeepAliveMessage(random); - } + int min = 1; + int max = 65000; + int randomNum = (int)(Math.random() * (max - min + 1)) + min; + blockFetcher.sendKeepAliveMessage(randomNum); + + System.out.println("Last Keep Alive Message Time : " + blockFetcher.getLastKeepAliveResponseTime()); + System.out.println("Last Keep Alive Message Cookie : " + blockFetcher.getLastKeepAliveResponseCookie()); - Thread.sleep(1000); + Thread.sleep(3000); } } diff --git a/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockRangeSyncIT.java b/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockRangeSyncIT.java index 357540f..80a2af5 100644 --- a/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockRangeSyncIT.java +++ b/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockRangeSyncIT.java @@ -1,16 +1,19 @@ package com.bloxbean.cardano.yaci.helper; +import com.bloxbean.cardano.yaci.core.common.Constants; import com.bloxbean.cardano.yaci.core.model.Block; import com.bloxbean.cardano.yaci.core.model.Era; import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Point; import com.bloxbean.cardano.yaci.helper.listener.BlockChainDataListener; import com.bloxbean.cardano.yaci.helper.model.Transaction; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -40,4 +43,46 @@ public void onBlock(Era era, Block block, List transactions) { assertThat(blocks.get(0).getHeader().getHeaderBody().getSlot()).isEqualTo(13107194); assertThat(blocks.get(2).getHeader().getHeaderBody().getSlot()).isEqualTo(13107220); } + + @Test + @Disabled + void fetch_tillTip() throws InterruptedException { + BlockRangeSync blockRangeSync = new BlockRangeSync(node, nodePort, protocolMagic); + AtomicInteger blockCount = new AtomicInteger(0); + blockRangeSync.start(new BlockChainDataListener() { + @Override + public void onBlock(Era era, Block block, List transactions) { + int count = blockCount.incrementAndGet(); + if (count % 1000 == 0) + System.out.println("Block: " + block.getHeader().getHeaderBody().getBlockNumber()); + } + }); + + Point from = null; + Point to = null; + if (protocolMagic == Constants.SANCHONET_PROTOCOL_MAGIC) { + from = new Point(60, "8f4e50c397cf0796e6ac9b6db9fc0b761a29f1a040a7f1cfaa35513e3cc4db38"); + to = new Point(19397676, "804046ba432b676895198d2dc9ae8f0f842f7dd74b8aba71f12dc98594548361"); + } else if (protocolMagic == Constants.PREPROD_PROTOCOL_MAGIC) { + from = new Point(2, "1d031daf47281f69cd95ab929c269fd26b1434a56a5bbbd65b7afe85ef96b233"); + to = new Point(50468813, "2fb2554a9fec38ce4b8121c001087f867b1bd19cda11e93dc5475dc253baf0e9"); + } else if (protocolMagic == Constants.MAINNET_PROTOCOL_MAGIC) { + from = new Point(1, "1dbc81e3196ba4ab9dcb07e1c37bb28ae1c289c0707061f28b567c2f48698d50"); + to = new Point(114620634, "fc1e525bd6406a1bf01b2423ea761336546ff14fc5bb3c4b711b60f57ae143a4"); + } + + blockRangeSync.fetch(from, to); + + while (true) { + int min = 1; + int max = 65000; + int randomNum = (int)(Math.random() * (max - min + 1)) + min; + blockRangeSync.sendKeepAliveMessage(randomNum); + + System.out.println("Last Keep Alive Message Time : " + blockRangeSync.getLastKeepAliveResponseTime()); + System.out.println("Last Keep Alive Message Cookie : " + blockRangeSync.getLastKeepAliveResponseCookie()); + + Thread.sleep(2000); + } + } } diff --git a/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockSyncIT.java b/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockSyncIT.java index 4b3d368..20e020d 100644 --- a/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockSyncIT.java +++ b/helper/src/integrationTest/java/com/bloxbean/cardano/yaci/helper/BlockSyncIT.java @@ -7,6 +7,7 @@ import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Tip; import com.bloxbean.cardano.yaci.helper.listener.BlockChainDataListener; import com.bloxbean.cardano.yaci.helper.model.Transaction; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.List; @@ -41,6 +42,36 @@ public void onBlock(Era era, Block block, List transactions) { assertThat(blockNo.get()).isGreaterThan(420800); } + @Test + @Disabled + void syncFromTip_dontStop() throws InterruptedException { + BlockSync blockSync = new BlockSync(node, nodePort, protocolMagic, Constants.WELL_KNOWN_PREPROD_POINT); + + blockSync.startSyncFromTip(new BlockChainDataListener() { + + public void onBlock(Era era, Block block, List transactions) { + System.out.println(block.getHeader().getHeaderBody().getBlockNumber()); + System.out.println("# of transactions >> " + transactions.size()); + } + + }); + + int aliveCount = 0; + while (true) { + aliveCount++; + if (aliveCount % 10 == 0) { + int min = 1; + int max = 65000; + int randomNum = (int)(Math.random() * (max - min + 1)) + min; + blockSync.sendKeepAliveMessage(randomNum); + } + + System.out.println("Last Keep Alive Message Time : " + blockSync.getLastKeepAliveResponseTime()); + System.out.println("Last Keep Alive Message Cookie : " + blockSync.getLastKeepAliveResponseCookie()); + Thread.sleep(2000); + } + } + @Test void syncFromPoint() throws InterruptedException { BlockSync blockSync = new BlockSync(node, nodePort, protocolMagic, Constants.WELL_KNOWN_PREPROD_POINT); diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockFetcher.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockFetcher.java index e32185f..4ac5e32 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockFetcher.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockFetcher.java @@ -55,6 +55,9 @@ public class BlockFetcher implements Fetcher { private BlockfetchAgent blockfetchAgent; private TCPNodeClient n2nClient; + private int lastKeepAliveResponseCookie = 0; + private long lastKeepAliveResponseTime = 0; + /** * Constructor to create BlockFetcher instance * @param host Cardano node host @@ -91,6 +94,11 @@ public void handshakeOk() { keepAliveAgent.sendKeepAlive(1234); } }); + + keepAliveAgent.addListener(response -> { + lastKeepAliveResponseCookie = response.getCookie(); + lastKeepAliveResponseTime = System.currentTimeMillis(); + }); } /** @@ -162,18 +170,20 @@ public void sendKeepAliveMessage(int cookie) { keepAliveAgent.sendKeepAlive(cookie); } -// public static void main(String[] args) { -// //shelley -// Point from = new Point(16588737, "4e9bbbb67e3ae262133d94c3da5bffce7b1127fc436e7433b87668dba34c354a"); -// Point to = new Point(70223766, "21155bb822637508a91e9952e712040c0ea45107fb91898bfe8c9a95389b0d90"); -// -// VersionTable versionTable = N2NVersionTableConstant.v4AndAbove(Networks.mainnet().getProtocolMagic()); -// BlockFetcher blockFetcher = new BlockFetcher("192.168.0.228", 6000, versionTable); -// -// blockFetcher.start(block -> { -// log.info("Block >>> {} -- {} {}", block.getHeader().getHeaderBody().getBlockNumber(), block.getHeader().getHeaderBody().getSlot() + " ", block.getEra()); -// }); -// -// blockFetcher.fetch(from, to); -// } + /** + * Get the last keep alive response cookie + * @return + */ + public int getLastKeepAliveResponseCookie() { + return lastKeepAliveResponseCookie; + } + + /** + * Get the last keep alive response time + * @return + */ + public long getLastKeepAliveResponseTime() { + return lastKeepAliveResponseTime; + } + } diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java index 4afafc5..e775657 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockRangeSync.java @@ -61,10 +61,30 @@ public void fetch(Point from, Point to) { blockFetcher.fetch(from, to); } + /** + * Send keep alive message + * @param cookie + */ public void sendKeepAliveMessage(int cookie) { blockFetcher.sendKeepAliveMessage(cookie); } + /** + * Get the last keep alive response cookie + * @return + */ + public int getLastKeepAliveResponseCookie() { + return blockFetcher.getLastKeepAliveResponseCookie(); + } + + /** + * Get the last keep alive response time + * @return + */ + public long getLastKeepAliveResponseTime() { + return blockFetcher.getLastKeepAliveResponseTime(); + } + /** * Stop the fetcher */ diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java index c2c6210..c5ccc43 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/BlockSync.java @@ -79,11 +79,31 @@ public void startSyncFromTip(BlockChainDataListener blockChainDataListener) { initializeAgentAndStart(wellKnownPoint, blockChainDataListener, true); } + /** + * Send keep alive message + * @param cookie + */ public void sendKeepAliveMessage(int cookie) { if (n2NChainSyncFetcher.isRunning()) n2NChainSyncFetcher.sendKeepAliveMessage(cookie); } + /** + * Get the last keep alive response cookie + * @return + */ + public int getLastKeepAliveResponseCookie() { + return n2NChainSyncFetcher.getLastKeepAliveResponseCookie(); + } + + /** + * Get the last keep alive response time + * @return + */ + public long getLastKeepAliveResponseTime() { + return n2NChainSyncFetcher.getLastKeepAliveResponseTime(); + } + /** * Stop the fetcher */ diff --git a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/N2NChainSyncFetcher.java b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/N2NChainSyncFetcher.java index 47e2c3b..a812658 100644 --- a/helper/src/main/java/com/bloxbean/cardano/yaci/helper/N2NChainSyncFetcher.java +++ b/helper/src/main/java/com/bloxbean/cardano/yaci/helper/N2NChainSyncFetcher.java @@ -46,6 +46,9 @@ public class N2NChainSyncFetcher implements Fetcher { private BlockfetchAgent blockFetchAgent; private TCPNodeClient n2nClient; + private int lastKeepAliveResponseCookie = 0; + private long lastKeepAliveResponseTime = 0; + /** * Construct {@link N2NChainSyncFetcher} to sync the blockchain * @@ -193,6 +196,11 @@ public void byronEbBlockFound(ByronEbBlock byronEbBlock) { } }); + keepAliveAgent.addListener(response -> { + lastKeepAliveResponseCookie = response.getCookie(); + lastKeepAliveResponseTime = System.currentTimeMillis(); + }); + n2nClient = new TCPNodeClient(host, port, handshakeAgent, keepAliveAgent, chainSyncAgent, blockFetchAgent); } @@ -253,11 +261,31 @@ public void addChainSyncListener(ChainSyncAgentListener listener) { chainSyncAgent.addListener(listener); } + /** + * Send keep alive message + * @param cookie + */ public void sendKeepAliveMessage(int cookie) { if (n2nClient.isRunning()) keepAliveAgent.sendKeepAlive(cookie); } + /** + * Get the last keep alive response cookie + * @return + */ + public int getLastKeepAliveResponseCookie() { + return lastKeepAliveResponseCookie; + } + + /** + * Get the last keep alive response time + * @return + */ + public long getLastKeepAliveResponseTime() { + return lastKeepAliveResponseTime; + } + /** * Check if the connection is alive *