diff --git a/server/src/main/java/com/hedera/block/server/verification/StreamVerificationHandlerImpl.java b/server/src/main/java/com/hedera/block/server/verification/StreamVerificationHandlerImpl.java index c4b5445e..a4c0feaa 100644 --- a/server/src/main/java/com/hedera/block/server/verification/StreamVerificationHandlerImpl.java +++ b/server/src/main/java/com/hedera/block/server/verification/StreamVerificationHandlerImpl.java @@ -27,6 +27,7 @@ import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.notifier.Notifier; import com.hedera.block.server.service.ServiceStatus; +import com.hedera.block.server.verification.service.BlockVerificationService; import com.hedera.hapi.block.BlockItemUnparsed; import com.hedera.hapi.block.SubscribeStreamResponseUnparsed; import com.hedera.pbj.runtime.OneOf; diff --git a/server/src/main/java/com/hedera/block/server/verification/VerificationInjectionModule.java b/server/src/main/java/com/hedera/block/server/verification/VerificationInjectionModule.java index 05ddec3f..bcbd9846 100644 --- a/server/src/main/java/com/hedera/block/server/verification/VerificationInjectionModule.java +++ b/server/src/main/java/com/hedera/block/server/verification/VerificationInjectionModule.java @@ -17,6 +17,9 @@ package com.hedera.block.server.verification; import com.hedera.block.server.metrics.MetricsService; +import com.hedera.block.server.verification.service.BlockVerificationService; +import com.hedera.block.server.verification.service.BlockVerificationServiceImpl; +import com.hedera.block.server.verification.service.NoOpBlockVerificationService; import com.hedera.block.server.verification.session.BlockVerificationSessionFactory; import com.hedera.block.server.verification.signature.SignatureVerifier; import com.hedera.block.server.verification.signature.SignatureVerifierDummy; @@ -49,7 +52,7 @@ static BlockVerificationService provideBlockVerificationService( if (verificationConfig.enabled()) { return new BlockVerificationServiceImpl(metricsService, blockVerificationSessionFactory); } else { - return new BlockVerificationServiceNoOp(); + return new NoOpBlockVerificationService(); } } } diff --git a/server/src/main/java/com/hedera/block/server/verification/hasher/NaiveStreamingTreeHasher.java b/server/src/main/java/com/hedera/block/server/verification/hasher/NaiveStreamingTreeHasher.java index 6381eca5..0e63110f 100644 --- a/server/src/main/java/com/hedera/block/server/verification/hasher/NaiveStreamingTreeHasher.java +++ b/server/src/main/java/com/hedera/block/server/verification/hasher/NaiveStreamingTreeHasher.java @@ -38,19 +38,6 @@ public class NaiveStreamingTreeHasher implements StreamingTreeHasher { private final List leafHashes = new ArrayList<>(); private boolean rootHashRequested = false; - /** - * Computes the root hash of a perfect binary Merkle tree of {@link ByteBuffer} leaves using a naive algorithm. - * @param leafHashes the leaf hashes of the tree - * @return the root hash of the tree - */ - public static Bytes computeRootHash(@NonNull final List leafHashes) { - final var hasher = new NaiveStreamingTreeHasher(); - for (final var hash : leafHashes) { - hasher.addLeaf(ByteBuffer.wrap(hash)); - } - return hasher.rootHash().join(); - } - @Override public void addLeaf(@NonNull final ByteBuffer hash) { if (rootHashRequested) { diff --git a/server/src/main/java/com/hedera/block/server/verification/BlockVerificationService.java b/server/src/main/java/com/hedera/block/server/verification/service/BlockVerificationService.java similarity index 95% rename from server/src/main/java/com/hedera/block/server/verification/BlockVerificationService.java rename to server/src/main/java/com/hedera/block/server/verification/service/BlockVerificationService.java index 5f3a288c..119a65fd 100644 --- a/server/src/main/java/com/hedera/block/server/verification/BlockVerificationService.java +++ b/server/src/main/java/com/hedera/block/server/verification/service/BlockVerificationService.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.hedera.block.server.verification; +package com.hedera.block.server.verification.service; import com.hedera.hapi.block.BlockItemUnparsed; import com.hedera.pbj.runtime.ParseException; diff --git a/server/src/main/java/com/hedera/block/server/verification/BlockVerificationServiceImpl.java b/server/src/main/java/com/hedera/block/server/verification/service/BlockVerificationServiceImpl.java similarity index 98% rename from server/src/main/java/com/hedera/block/server/verification/BlockVerificationServiceImpl.java rename to server/src/main/java/com/hedera/block/server/verification/service/BlockVerificationServiceImpl.java index 511168d9..0ccf6fa6 100644 --- a/server/src/main/java/com/hedera/block/server/verification/BlockVerificationServiceImpl.java +++ b/server/src/main/java/com/hedera/block/server/verification/service/BlockVerificationServiceImpl.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.hedera.block.server.verification; +package com.hedera.block.server.verification.service; import static java.lang.System.Logger.Level.WARNING; diff --git a/server/src/main/java/com/hedera/block/server/verification/BlockVerificationServiceNoOp.java b/server/src/main/java/com/hedera/block/server/verification/service/NoOpBlockVerificationService.java similarity index 87% rename from server/src/main/java/com/hedera/block/server/verification/BlockVerificationServiceNoOp.java rename to server/src/main/java/com/hedera/block/server/verification/service/NoOpBlockVerificationService.java index a206ab30..53c22b4a 100644 --- a/server/src/main/java/com/hedera/block/server/verification/BlockVerificationServiceNoOp.java +++ b/server/src/main/java/com/hedera/block/server/verification/service/NoOpBlockVerificationService.java @@ -14,12 +14,12 @@ * limitations under the License. */ -package com.hedera.block.server.verification; +package com.hedera.block.server.verification.service; import com.hedera.hapi.block.BlockItemUnparsed; import java.util.List; -public class BlockVerificationServiceNoOp implements BlockVerificationService { +public class NoOpBlockVerificationService implements BlockVerificationService { @Override public void onBlockItemsReceived(List blockItems) { // no-op diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index d4c8618d..dc6abed4 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -25,6 +25,7 @@ exports com.hedera.block.server.verification.hasher; exports com.hedera.block.server.verification.session; exports com.hedera.block.server.verification.signature; + exports com.hedera.block.server.verification.service; requires com.hedera.block.common; requires com.hedera.block.stream; diff --git a/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java b/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java index 3b7b5614..5d124208 100644 --- a/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java +++ b/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java @@ -44,8 +44,8 @@ import com.hedera.block.server.service.ServiceStatusImpl; import com.hedera.block.server.util.TestConfigUtil; import com.hedera.block.server.util.TestUtils; -import com.hedera.block.server.verification.BlockVerificationService; -import com.hedera.block.server.verification.BlockVerificationServiceNoOp; +import com.hedera.block.server.verification.service.BlockVerificationService; +import com.hedera.block.server.verification.service.NoOpBlockVerificationService; import com.hedera.block.server.verification.StreamVerificationHandlerImpl; import com.hedera.hapi.block.Acknowledgement; import com.hedera.hapi.block.BlockItemUnparsed; @@ -275,7 +275,7 @@ private PbjBlockStreamServiceProxy buildBlockStreamService(final Notifier notifi final var streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), serviceStatus); final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockVerificationService blockVerificationService = new BlockVerificationServiceNoOp(); + final BlockVerificationService blockVerificationService = new NoOpBlockVerificationService(); final var streamVerificationHandler = new StreamVerificationHandlerImpl( streamMediator, notifier, blockNodeContext.metricsService(), serviceStatus, blockVerificationService); diff --git a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java index 966ecedc..f36cc364 100644 --- a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java @@ -49,7 +49,7 @@ import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; import com.hedera.block.server.util.TestConfigUtil; -import com.hedera.block.server.verification.BlockVerificationService; +import com.hedera.block.server.verification.service.BlockVerificationService; import com.hedera.block.server.verification.StreamVerificationHandlerImpl; import com.hedera.hapi.block.Acknowledgement; import com.hedera.hapi.block.BlockItemSetUnparsed; diff --git a/server/src/test/java/com/hedera/block/server/verification/BlockVerificationServiceImplTest.java b/server/src/test/java/com/hedera/block/server/verification/BlockVerificationServiceImplTest.java new file mode 100644 index 00000000..8f3631b2 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/verification/BlockVerificationServiceImplTest.java @@ -0,0 +1,215 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.verification; + +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.*; + +import com.hedera.block.server.metrics.BlockNodeMetricTypes; +import com.hedera.block.server.metrics.MetricsService; +import com.hedera.block.server.verification.service.BlockVerificationService; +import com.hedera.block.server.verification.service.BlockVerificationServiceImpl; +import com.hedera.block.server.verification.session.BlockVerificationSession; +import com.hedera.block.server.verification.session.BlockVerificationSessionFactory; +import com.hedera.hapi.block.BlockItemUnparsed; +import com.hedera.hapi.block.stream.output.BlockHeader; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import com.swirlds.metrics.api.Counter; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class BlockVerificationServiceImplTest { + + @Mock + private MetricsService metricsService; + + @Mock + private BlockVerificationSessionFactory sessionFactory; + + @Mock + private BlockVerificationSession previousSession; + + @Mock + private BlockVerificationSession newSession; + + @Mock + private Counter verificationBlocksReceived; + + @Mock + private Counter verificationBlocksFailed; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(metricsService.get(BlockNodeMetricTypes.Counter.VerificationBlocksReceived)) + .thenReturn(verificationBlocksReceived); + when(metricsService.get(BlockNodeMetricTypes.Counter.VerificationBlocksFailed)) + .thenReturn(verificationBlocksFailed); + } + + @Test + void testOnBlockItemsReceivedWithNewBlockHeaderNoPreviousSession() throws ParseException { + // Given a new block header starting at block #10 + long blockNumber = 10; + BlockItemUnparsed blockHeaderItem = getBlockHeaderUnparsed(blockNumber); + List blockItems = List.of(blockHeaderItem); + + // No previous session + when(sessionFactory.createSession(any())).thenReturn(newSession); + + BlockVerificationService service = new BlockVerificationServiceImpl(metricsService, sessionFactory); + + // When + service.onBlockItemsReceived(blockItems); + + // Then + verify(verificationBlocksReceived).increment(); // new block received + verify(sessionFactory).createSession(getBlockHeader(blockNumber)); + verify(newSession).appendBlockItems(blockItems); + // No previous session, so just logs a warning internally + } + + @Test + void testOnBlockItemsReceivedWithNewBlockHeaderAndPreviousSessionHashMatch() throws ParseException { + // Given a previous verified block #9 and now receiving header for block #10 + long previousBlockNumber = 9; + long newBlockNumber = 10; + + BlockItemUnparsed blockHeaderItem = getBlockHeaderUnparsed(newBlockNumber); + List blockItems = List.of(blockHeaderItem); + + // Previous session result matches the expected previous hash + CompletableFuture future = new CompletableFuture<>(); + future.complete(getVerificationResult(previousBlockNumber)); + when(previousSession.getVerificationResult()).thenReturn(future); + + when(sessionFactory.createSession(getBlockHeader(newBlockNumber))).thenReturn(newSession); + + BlockVerificationServiceImpl service = new BlockVerificationServiceImpl(metricsService, sessionFactory); + setCurrentSession(service, previousSession); + + // When + service.onBlockItemsReceived(blockItems); + + // Then + verify(verificationBlocksReceived).increment(); + verify(verificationBlocksFailed, never()).increment(); + verify(newSession).appendBlockItems(blockItems); + } + + @Test + void testOnBlockItemsReceivedWithNewBlockHeaderAndPreviousSessionHashMismatch() throws ParseException { + // Given a previous block #9 but now we produce a verification result that doesn't match the new header's prev + // hash + long previousBlockNumber = 9; + long newBlockNumber = 10; + + BlockItemUnparsed blockHeaderItem = getBlockHeaderUnparsed(newBlockNumber); + List blockItems = List.of(blockHeaderItem); + + // Make the previous session result have a different hash (e.g., block #99) + CompletableFuture future = new CompletableFuture<>(); + future.complete(getVerificationResult(99)); // This gives hash99, not hash9 + when(previousSession.getVerificationResult()).thenReturn(future); + + when(sessionFactory.createSession(getBlockHeader(newBlockNumber))).thenReturn(newSession); + + BlockVerificationServiceImpl service = new BlockVerificationServiceImpl(metricsService, sessionFactory); + setCurrentSession(service, previousSession); + + // When + service.onBlockItemsReceived(blockItems); + + // Then + verify(verificationBlocksReceived).increment(); + verify(verificationBlocksFailed).increment(); // mismatch should cause increment + verify(newSession).appendBlockItems(blockItems); + } + + @Test + void testOnBlockItemsReceivedNoBlockHeaderNoCurrentSession() throws ParseException { + BlockItemUnparsed normalItem = getNormalBlockItem(); + List blockItems = List.of(normalItem); + + BlockVerificationService service = new BlockVerificationServiceImpl(metricsService, sessionFactory); + + // When + service.onBlockItemsReceived(blockItems); + + // Then + // Just logs a warning. No increments or sessions created. + verifyNoInteractions(sessionFactory); + verifyNoInteractions(verificationBlocksReceived, verificationBlocksFailed); + } + + @Test + void testOnBlockItemsReceivedNoBlockHeaderWithCurrentSession() throws ParseException { + BlockItemUnparsed normalItem = getNormalBlockItem(); + List blockItems = List.of(normalItem); + + BlockVerificationServiceImpl service = new BlockVerificationServiceImpl(metricsService, sessionFactory); + setCurrentSession(service, previousSession); + + // When + service.onBlockItemsReceived(blockItems); + + // Then + verify(previousSession).appendBlockItems(blockItems); + verifyNoInteractions(verificationBlocksReceived, verificationBlocksFailed); + } + + private VerificationResult getVerificationResult(long blockNumber) { + return new VerificationResult( + blockNumber, Bytes.wrap(("hash" + blockNumber).getBytes()), BlockVerificationStatus.VERIFIED); + } + + private BlockHeader getBlockHeader(long blockNumber) { + long previousBlockNumber = blockNumber - 1; + + return BlockHeader.newBuilder() + .previousBlockHash(Bytes.wrap(("hash" + previousBlockNumber).getBytes())) + .number(blockNumber) + .build(); + } + + private BlockItemUnparsed getBlockHeaderUnparsed(long blockNumber) { + return BlockItemUnparsed.newBuilder() + .blockHeader(BlockHeader.PROTOBUF.toBytes(getBlockHeader(blockNumber))) + .build(); + } + + private BlockItemUnparsed getNormalBlockItem() { + // A block item without a block header + return BlockItemUnparsed.newBuilder().build(); + } + + // Helper method to set the currentSession field via reflection since it’s private + private static void setCurrentSession(BlockVerificationServiceImpl service, BlockVerificationSession session) { + try { + var field = BlockVerificationServiceImpl.class.getDeclaredField("currentSession"); + field.setAccessible(true); + field.set(service, session); + } catch (NoSuchFieldException | IllegalAccessException e) { + fail("Unable to set currentSession via reflection", e); + } + } +} diff --git a/server/src/test/java/com/hedera/block/server/verification/hasher/ConcurrentStreamingTreeHasherTest.java b/server/src/test/java/com/hedera/block/server/verification/hasher/ConcurrentStreamingTreeHasherTest.java new file mode 100644 index 00000000..3e5628b2 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/verification/hasher/ConcurrentStreamingTreeHasherTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.verification.hasher; + +import static com.hedera.block.server.verification.hasher.ConcurrentStreamingTreeHasher.rootHashFrom; +import static com.hedera.block.server.verification.hasher.StreamingTreeHasher.HASH_LENGTH; +import static java.util.Objects.requireNonNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.hedera.pbj.runtime.io.buffer.Bytes; +import java.nio.ByteBuffer; +import java.util.SplittableRandom; +import java.util.concurrent.ForkJoinPool; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class ConcurrentStreamingTreeHasherTest { + private static final SplittableRandom RANDOM = new SplittableRandom(); + + private final NaiveStreamingTreeHasher comparison = new NaiveStreamingTreeHasher(); + private final ConcurrentStreamingTreeHasher subject = new ConcurrentStreamingTreeHasher(ForkJoinPool.commonPool()); + + @ParameterizedTest + @ValueSource(ints = {0, 1, 3, 5, 32, 69, 100, 123, 234}) + void testAddLeafAndRootHash(final int numLeaves) { + ByteBuffer lastLeafHash = null; + var status = StreamingTreeHasher.Status.EMPTY; + for (int i = 1; i <= numLeaves; i++) { + final var hash = new byte[HASH_LENGTH]; + RANDOM.nextBytes(hash); + final var leafHash = ByteBuffer.wrap(hash); + subject.addLeaf(ByteBuffer.wrap(hash)); + comparison.addLeaf(ByteBuffer.wrap(hash)); + if (i == numLeaves - 1) { + status = subject.status(); + } else if (i == numLeaves) { + lastLeafHash = leafHash; + } + } + + final var actual = subject.rootHash().join(); + final var expected = comparison.rootHash().join(); + assertEquals(expected, actual); + if (lastLeafHash != null) { + requireNonNull(status); + final var recalculated = rootHashFrom(status, Bytes.wrap(lastLeafHash.array())); + assertEquals(expected, recalculated); + } + } + + @Test + void testAddLeafAfterRootHashRequested() { + final var leaf = ByteBuffer.allocate(48); + subject.addLeaf(leaf); + subject.rootHash(); + assertThrows(IllegalStateException.class, () -> subject.addLeaf(leaf)); + } +}