Skip to content

Commit

Permalink
fix: removed NotifierBuilder and set up dagger to use NotifierImpl
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 18, 2024
1 parent 410284c commit dd5fa4b
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.block.server.mediator;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.notifier.Notifiable;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SubscribeStreamResponse;
import dagger.Module;
Expand Down Expand Up @@ -56,4 +57,19 @@ static SubscriptionHandler<SubscribeStreamResponse> provideSubscriptionHandler(
@NonNull final ServiceStatus serviceStatus) {
return LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();
}

/**
* Provides the mediator as a Notifiable instance.
*
* @param blockNodeContext the block node context
* @param serviceStatus the service status
* @return the mediator as a Notifiable instance
*/
@Provides
@Singleton
static Notifiable provideMediator(
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
return LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import static java.lang.System.Logger.Level.ERROR;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandlerBase;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.service.ServiceStatus;
Expand All @@ -34,10 +32,11 @@
import com.hedera.hapi.block.PublishStreamResponseCode;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.lmax.disruptor.BatchEventProcessor;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import javax.inject.Singleton;

/**
* Use NotifierImpl to mediate the stream of responses from the persistence layer back to multiple
Expand All @@ -48,26 +47,36 @@
* persists the block items to a store. It also notifies the mediator of critical system events and
* will stop the server in the event of an unrecoverable error.
*/
class NotifierImpl extends SubscriptionHandlerBase<PublishStreamResponse> implements Notifier {
@Singleton
public class NotifierImpl extends SubscriptionHandlerBase<PublishStreamResponse>
implements Notifier {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

/** The initial capacity of producers in the subscriber map. */
private static final int SUBSCRIBER_INIT_CAPACITY = 5;

private final Notifiable mediator;
private final MetricsService metricsService;
private final ServiceStatus serviceStatus;

NotifierImpl(
@NonNull
final Map<
BlockNodeEventHandler<ObjectEvent<PublishStreamResponse>>,
BatchEventProcessor<ObjectEvent<PublishStreamResponse>>>
subscribers,
/**
* Constructs a new NotifierImpl instance with the given mediator, block node context, and
* service status.
*
* @param mediator the mediator to notify of critical system events
* @param blockNodeContext the block node context
* @param serviceStatus the service status to stop the service and web server if an exception
* occurs
*/
@Inject
public NotifierImpl(
@NonNull final Notifiable mediator,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {

super(
subscribers,
new ConcurrentHashMap<>(SUBSCRIBER_INIT_CAPACITY),
blockNodeContext.metricsService().get(Producers),
blockNodeContext
.configuration()
Expand Down Expand Up @@ -129,7 +138,7 @@ public void publish(@NonNull BlockItem blockItem) {
}

@NonNull
public static PublishStreamResponse buildErrorStreamResponse() {
static PublishStreamResponse buildErrorStreamResponse() {
// TODO: Replace this with a real error enum.
final EndOfStream endOfStream =
EndOfStream.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

package com.hedera.block.server.notifier;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.mediator.LiveStreamMediator;
import com.hedera.block.server.service.ServiceStatus;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import edu.umd.cs.findbugs.annotations.NonNull;
import javax.inject.Singleton;

/** A Dagger module for providing dependencies for Notifier Module. */
Expand All @@ -31,17 +27,10 @@ public interface NotifierInjectionModule {
/**
* Provides the notifier.
*
* @param streamMediator the stream mediator
* @param blockNodeContext the block node context
* @param serviceStatus the service status
* @param notifier requires a notifier implementation
* @return the notifier
*/
@Provides
@Binds
@Singleton
static Notifier providesNotifier(
@NonNull final LiveStreamMediator streamMediator,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
return NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build();
}
Notifier bindNotifier(NotifierImpl notifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.hedera.block.server.mediator.LiveStreamMediator;
import com.hedera.block.server.mediator.LiveStreamMediatorBuilder;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.notifier.NotifierBuilder;
import com.hedera.block.server.notifier.NotifierImpl;
import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.persistence.storage.write.BlockAsDirWriterBuilder;
Expand Down Expand Up @@ -165,8 +165,7 @@ public void testPublishBlockStreamRegistrationAndExecution()
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final var streamMediator =
LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();
final var notifier =
NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build();
final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus);
final var blockNodeEventHandler =
new StreamPersistenceHandlerImpl(
streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus);
Expand Down Expand Up @@ -559,8 +558,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep
doThrow(IOException.class).when(blockWriter).write(blockItems.getFirst());

final var streamMediator = buildStreamMediator(consumers, serviceStatus);
final var notifier =
NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build();
final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus);
final var blockNodeEventHandler =
new StreamPersistenceHandlerImpl(
streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus);
Expand Down Expand Up @@ -726,8 +724,7 @@ private BlockStreamService buildBlockStreamService(final BlockWriter<BlockItem>

final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final var streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), serviceStatus);
final var notifier =
NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build();
final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus);
final var blockNodeEventHandler =
new StreamPersistenceHandlerImpl(
streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.metrics.BlockNodeMetricTypes;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.notifier.NotifierBuilder;
import com.hedera.block.server.notifier.NotifierImpl;
import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.block.server.service.ServiceStatus;
Expand Down Expand Up @@ -428,8 +428,7 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr
streamMediator.subscribe(concreteObserver2);
streamMediator.subscribe(concreteObserver3);

final Notifier notifier =
NotifierBuilder.newBuilder(streamMediator, blockNodeContext, serviceStatus).build();
final Notifier notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus);
final var streamValidator =
new StreamPersistenceHandlerImpl(
streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.notifier.Notifiable;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.util.TestConfigUtil;
import com.hedera.hapi.block.SubscribeStreamResponse;
Expand Down Expand Up @@ -65,4 +66,17 @@ void testProvidesSubscriptionHandler() throws IOException {
// Verify that the subscriptionHandler is correctly instantiated
assertNotNull(subscriptionHandler);
}

@Test
void testProvidesMediator() throws IOException {

BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();

// Call the method under test
Notifiable mediator =
MediatorInjectionModule.provideMediator(blockNodeContext, serviceStatus);

// Verify that the mediator is correctly instantiated
assertNotNull(mediator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import static org.mockito.Mockito.when;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.Publisher;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
Expand All @@ -39,7 +37,6 @@
import com.hedera.hapi.block.Acknowledgement;
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.lmax.disruptor.BatchEventProcessor;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -89,8 +86,7 @@ public NotifierImplTest() throws IOException {
public void testRegistration() throws NoSuchAlgorithmException {

final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext);
final var notifier =
NotifierBuilder.newBuilder(mediator, testContext, serviceStatus).build();
final var notifier = new NotifierImpl(mediator, testContext, serviceStatus);

when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

Expand Down Expand Up @@ -172,8 +168,7 @@ public void testTimeoutExpiredHandling() throws InterruptedException {

when(serviceStatus.isRunning()).thenReturn(true);

final var notifier =
NotifierBuilder.newBuilder(mediator, testContext, serviceStatus).build();
final var notifier = new NotifierImpl(mediator, testContext, serviceStatus);

// Set the clocks to be expired
final InstantSource testClock1 = mock(InstantSource.class);
Expand Down Expand Up @@ -246,8 +241,7 @@ public void testTimeoutExpiredHandling() throws InterruptedException {
public void testPublishThrowsNoSuchAlgorithmException() {

when(serviceStatus.isRunning()).thenReturn(true);
final var notifier =
new TestNotifier(new HashMap<>(), mediator, testContext, serviceStatus);
final var notifier = new TestNotifier(mediator, testContext, serviceStatus);
final var concreteObserver1 =
new ProducerBlockItemObserver(
testClock,
Expand Down Expand Up @@ -303,8 +297,7 @@ public void testServiceStatusNotRunning() throws NoSuchAlgorithmException {

// Set the serviceStatus to not running
when(serviceStatus.isRunning()).thenReturn(false);
final var notifier =
new TestNotifier(new HashMap<>(), mediator, testContext, serviceStatus);
final var notifier = new TestNotifier(mediator, testContext, serviceStatus);
final var concreteObserver1 =
new ProducerBlockItemObserver(
testClock,
Expand Down Expand Up @@ -364,15 +357,10 @@ public void testServiceStatusNotRunning() throws NoSuchAlgorithmException {

private static final class TestNotifier extends NotifierImpl {
public TestNotifier(
@NonNull
final Map<
BlockNodeEventHandler<ObjectEvent<PublishStreamResponse>>,
BatchEventProcessor<ObjectEvent<PublishStreamResponse>>>
subscribers,
@NonNull final Notifiable mediator,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
super(subscribers, mediator, blockNodeContext, serviceStatus);
super(mediator, blockNodeContext, serviceStatus);
}

@Override
Expand Down
Loading

0 comments on commit dd5fa4b

Please sign in to comment.