Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 committed Dec 3, 2024
1 parent b6e5369 commit f50aa4e
Show file tree
Hide file tree
Showing 11 changed files with 14 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@
* execution, and shutdown of streaming operations based on the configured mode.
*/
public class BlockStreamSimulatorApp {
/** Logger for this class */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// Service dependencies
private final PublishStreamGrpcClient publishStreamGrpcClient;
private final ConsumerStreamGrpcClient consumerStreamGrpcClient;
private final SimulatorModeHandler simulatorModeHandler;
private final MetricsService metricsService;

// State
private final AtomicBoolean isRunning;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

/**
* Creates a new BlockStreamSimulatorApp instance with the specified dependencies.
Expand All @@ -80,10 +78,8 @@ public BlockStreamSimulatorApp(
requireNonNull(configuration);
requireNonNull(blockStreamManager);

this.metricsService = requireNonNull(metricsService);
this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient);
this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient);
this.isRunning = new AtomicBoolean(false);

// Initialize the appropriate mode handler based on configuration
final BlockStreamConfig blockStreamConfig =
Expand All @@ -92,9 +88,8 @@ public BlockStreamSimulatorApp(
this.simulatorModeHandler = switch (simulatorMode) {
case PUBLISHER -> new PublisherModeHandler(
blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService);
case CONSUMER -> new ConsumerModeHandler(blockStreamConfig, consumerStreamGrpcClient);
case BOTH -> new CombinedModeHandler(blockStreamConfig);
default -> throw new IllegalArgumentException("Unknown SimulatorMode: " + simulatorMode);
case CONSUMER -> new ConsumerModeHandler(consumerStreamGrpcClient);
case BOTH -> new CombinedModeHandler();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ public BlockAsFileLargeDataSets(@NonNull BlockGeneratorConfig config) {
this.formatString = "%0" + config.paddedLength() + "d" + config.fileExtension();
}

/**
* Initialize the block stream manager and load blocks into memory.
*/
@Override
public void init() {
// Do nothing, because we don't have real initializing and loading blocks into memory for this implementation.
}

@Override
public GenerationMode getGenerationMode() {
return GenerationMode.DIR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface BlockStreamManager {
/**
* Initialize the block stream manager and load blocks into memory.
*/
void init();
default void init() {}

/**
* Get the generation mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
* This class processes incoming blocks and status messages, updating metrics accordingly.
*/
public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamResponse> {

/** Logger for this class */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// Service dependencies
Expand Down Expand Up @@ -105,8 +103,10 @@ public void onCompleted() {
}

private void processBlockItems(List<BlockItem> blockItems) {
blockItems.stream()
.filter(BlockItem::hasBlockProof)
.forEach(__ -> metricsService.get(LiveBlocksConsumed).increment());
blockItems.stream().filter(BlockItem::hasBlockProof).forEach(blockItem -> {
metricsService.get(LiveBlocksConsumed).increment();
LOGGER.log(
INFO, "Received block number: " + blockItem.getBlockProof().getBlock());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
* block chunking, and tracks metrics related to block publication.
*/
public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
/** Logger for this class */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
* This class processes server responses and manages the stream state based on server feedback.
*/
public class PublishStreamObserver implements StreamObserver<PublishStreamResponse> {

/** Logger for this class */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package com.hedera.block.simulator.mode;

import static java.util.Objects.requireNonNull;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
* The {@code CombinedModeHandler} class implements the {@link SimulatorModeHandler} interface
Expand All @@ -34,17 +31,11 @@
* {@link UnsupportedOperationException}.
*/
public class CombinedModeHandler implements SimulatorModeHandler {
private final BlockStreamConfig blockStreamConfig;

/**
* Constructs a new {@code CombinedModeHandler} with the specified configuration.
*
* @param blockStreamConfig The configuration for block streaming parameters
* @throws NullPointerException if blockStreamConfig is null
*/
public CombinedModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) {
this.blockStreamConfig = requireNonNull(blockStreamConfig);
}
public CombinedModeHandler() {}

/**
* Initializes resources for both consuming and publishing blocks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,18 @@
* {@link UnsupportedOperationException}.
*/
public class ConsumerModeHandler implements SimulatorModeHandler {
/** Logger for this class */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// Configuration
private final BlockStreamConfig blockStreamConfig;

// Service dependencies
private final ConsumerStreamGrpcClient consumerStreamGrpcClient;

/**
* Constructs a new {@code ConsumerModeHandler} with the specified dependencies.
*
* @param blockStreamConfig The configuration for block streaming parameters
* @param consumerStreamGrpcClient The client for consuming blocks via gRPC
* @throws NullPointerException if any parameter is null
*/
public ConsumerModeHandler(
@NonNull final BlockStreamConfig blockStreamConfig,
@NonNull final ConsumerStreamGrpcClient consumerStreamGrpcClient) {
this.blockStreamConfig = requireNonNull(blockStreamConfig);
public ConsumerModeHandler(@NonNull final ConsumerStreamGrpcClient consumerStreamGrpcClient) {
this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
* the simulator needs to handle publication of blocks.
*/
public class PublisherModeHandler implements SimulatorModeHandler {
/** Logger for this class */
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// Configuration fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@

import static org.junit.jupiter.api.Assertions.assertThrows;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class CombinedModeHandlerTest {

@Mock
private BlockStreamConfig blockStreamConfig;

private CombinedModeHandler combinedModeHandler;

@Test
void testStartThrowsUnsupportedOperationException() {
MockitoAnnotations.openMocks(this);
combinedModeHandler = new CombinedModeHandler(blockStreamConfig);
combinedModeHandler = new CombinedModeHandler();

assertThrows(UnsupportedOperationException.class, () -> combinedModeHandler.start());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,25 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ConsumerModeHandlerTest {

private BlockStreamConfig blockStreamConfig;
private ConsumerStreamGrpcClient consumerStreamGrpcClient;
private ConsumerModeHandler consumerModeHandler;

@BeforeEach
void setUp() {
blockStreamConfig = mock(BlockStreamConfig.class);
consumerStreamGrpcClient = mock(ConsumerStreamGrpcClient.class);

consumerModeHandler = new ConsumerModeHandler(blockStreamConfig, consumerStreamGrpcClient);
consumerModeHandler = new ConsumerModeHandler(consumerStreamGrpcClient);
}

@Test
void testConstructorWithNullArguments() {
assertThrows(NullPointerException.class, () -> new ConsumerModeHandler(null, consumerStreamGrpcClient));
assertThrows(NullPointerException.class, () -> new ConsumerModeHandler(blockStreamConfig, null));
assertThrows(NullPointerException.class, () -> new ConsumerModeHandler(null));
}

@Test
Expand Down

0 comments on commit f50aa4e

Please sign in to comment.