Skip to content

Commit

Permalink
add temporary code
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 committed Nov 25, 2024
1 parent 4d37a65 commit d6d5deb
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.swirlds.metrics.api.Metrics;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.Map;

Expand Down Expand Up @@ -55,4 +56,10 @@ public static Metrics getTestMetrics(@NonNull Configuration configuration) {
metricsProvider.start();
return metrics;
}

public static int findFreePort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.simulator.grpc.impl;

import static com.hedera.block.simulator.TestUtils.findFreePort;
import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -46,15 +47,13 @@ public class ConsumerStreamGrpcClientImplTest {
@Mock
private GrpcConfig grpcConfig;

private MetricsService metricsService;
private ConsumerStreamGrpcClient consumerStreamGrpcClientImpl;
private Server server;
private int serverPort;

@BeforeEach
void setUp() throws IOException {
MockitoAnnotations.openMocks(this);
serverPort = findFreePort();
final int serverPort = findFreePort();
server = ServerBuilder.forPort(serverPort)
.addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
@Override
Expand Down Expand Up @@ -100,8 +99,8 @@ public void subscribeBlockStream(
when(grpcConfig.serverAddress()).thenReturn("localhost");
when(grpcConfig.port()).thenReturn(serverPort);

Configuration config = TestUtils.getTestConfiguration();
metricsService = new MetricsServiceImpl(getTestMetrics(config));
final Configuration config = TestUtils.getTestConfiguration();
final MetricsService metricsService = new MetricsServiceImpl(getTestMetrics(config));
consumerStreamGrpcClientImpl = new ConsumerStreamGrpcClientImpl(grpcConfig, metricsService);
consumerStreamGrpcClientImpl.init();
}
Expand Down Expand Up @@ -179,11 +178,4 @@ void shutdown() throws InterruptedException {
.toString(),
firstStatus);
}

private int findFreePort() throws IOException {
// Find a free port
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@

package com.hedera.block.simulator.grpc.impl;

import static com.hedera.block.simulator.TestUtils.findFreePort;
import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import com.hedera.block.simulator.TestUtils;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
import com.hedera.hapi.block.protoc.*;
import com.hedera.hapi.block.stream.protoc.Block;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.swirlds.config.api.Configuration;
Expand All @@ -39,34 +40,133 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class PublishStreamGrpcClientImplTest {

private MetricsService metricsService;
private PublishStreamGrpcClient publishStreamGrpcClient;
@Mock
private GrpcConfig grpcConfig;
private BlockStreamConfig blockStreamConfig;
private AtomicBoolean streamEnabled;
private Server server;

@BeforeEach
void setUp() throws IOException {
// MockitoAnnotations.openMocks(this);
//
// // Initialize the AtomicBoolean
// streamEnabled = new AtomicBoolean(true);
//
// // Start the server on a random port
final int serverPort = findFreePort();
// server = ServerBuilder.forPort(serverPort)
// .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
// @Override
// public StreamObserver<PublishStreamRequest> publishBlockStream(
// StreamObserver<PublishStreamResponse> responseObserver) {
//
// return new StreamObserver<PublishStreamRequest>() {
//
// private boolean firstBlockReceived = false;
// private long lastBlockNumber = 0;
//
// @Override
// public void onNext(PublishStreamRequest request) {
// if (!streamEnabled.get()) {
// // Send an error response
// PublishStreamResponse.EndOfStream endOfStream = PublishStreamResponse.EndOfStream.newBuilder()
// .setStatus(PublishStreamResponseCode.STREA)
// .setBlockNumber(lastBlockNumber)
// .build();
// responseObserver.onNext(PublishStreamResponse.newBuilder()
// .setStatus(endOfStream)
// .build());
// responseObserver.onCompleted();
// return;
// }
//
// BlockItemSet blockItems = request.getBlockItems();
// List<BlockItem> items = blockItems.getBlockItemsList();
//
// // Simulate processing of block items
// for (BlockItem item : items) {
// // Assume that the first BlockItem is a BlockHeader
// if (item.hasBlockHeader()) {
// firstBlockReceived = true;
// lastBlockNumber = item.getBlockHeader().getBlockNumber();
// }
// // Assume that the last BlockItem is a BlockProof
// if (item.hasBlockProof()) {
// // Send BlockAcknowledgement
// PublishStreamResponse.Acknowledgement acknowledgement =
// PublishStreamResponse.Acknowledgement.newBuilder()
// .setBlockAck(PublishStreamResponse.BlockAcknowledgement.newBuilder()
// .setBlockNumber(lastBlockNumber)
// .setBlockRootHash(item.getBlockProof().getMerkleRoot())
// .build())
// .build();
// responseObserver.onNext(PublishStreamResponse.newBuilder()
// .setAcknowledgement(acknowledgement)
// .build());
// }
// }
// }
//
// @Override
// public void onError(Throwable t) {
// // Handle error
// }
//
// @Override
// public void onCompleted() {
// // Send EndOfStream response
// PublishStreamResponse.EndOfStream endOfStream = PublishStreamResponse.EndOfStream.newBuilder()
// .setStatus(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS)
// .setBlockNumber(lastBlockNumber)
// .build();
// responseObserver.onNext(PublishStreamResponse.newBuilder()
// .setStatus(endOfStream)
// .build());
// responseObserver.onCompleted();
// }
// };
// }
// })
// .build()
// .start();

grpcConfig = TestUtils.getTestConfiguration().getConfigData(GrpcConfig.class);
blockStreamConfig = TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2"))
.getConfigData(BlockStreamConfig.class);

Configuration config = TestUtils.getTestConfiguration();
metricsService = new MetricsServiceImpl(getTestMetrics(config));
streamEnabled = new AtomicBoolean(true);

when(grpcConfig.serverAddress()).thenReturn("localhost");
when(grpcConfig.port()).thenReturn(serverPort);

publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
}

@AfterEach
void teardown() {
// server.shutdownNow();
}

@Test
void streamBlockItem() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Expand Down

0 comments on commit d6d5deb

Please sign in to comment.