Skip to content

Commit

Permalink
fix: javadoc
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 17, 2024
1 parent c987147 commit 410284c
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,24 @@ static ConsumerConfig provideConsumerConfig(Configuration configuration) {
return configuration.getConfigData(ConsumerConfig.class);
}

/**
* Provides a mediator configuration singleton using the configuration.
*
* @param configuration is the configuration singleton
* @return a mediator configuration singleton
*/
@Singleton
@Provides
static MediatorConfig provideMediatorConfig(Configuration configuration) {
return configuration.getConfigData(MediatorConfig.class);
}

/**
* Provides a notifier configuration singleton using the configuration.
*
* @param configuration is the configuration singleton
* @return a notifier configuration singleton
*/
@Singleton
@Provides
static NotifierConfig provideNotifierConfig(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
* @param <V> the type of the event value
*/
public interface BlockNodeEventHandler<V> extends EventHandler<V> {

/**
* Use this method to check if the underlying event handler is timed out.
*
* @return true if the timeout has expired, false otherwise
*/
default boolean isTimeoutExpired() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@

package com.hedera.block.server.exception;

/**
* Use this checked exception to represent a Block Node protocol exception encountered while
* processing block items.
*/
public class BlockStreamProtocolException extends Exception {

/**
* Constructs a new exception with the specified detail message.
*
* @param message the detail message
*/
public BlockStreamProtocolException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import java.util.Map;

/**
* LiveStreamMediatorImpl is an implementation of the StreamMediator interface. It is responsible
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
* block items to the subscribers as they arrive via a RingBuffer and persists the block items to a
* store.
* Use LiveStreamMediatorImpl to mediate the live stream of blocks from a producer to multiple
* consumers.
*
* <p>As an implementation of the StreamMediator interface, it proxies block items to the
* subscribers as they arrive via a RingBuffer maintained in the base class and persists the block
* items to a store.
*/
class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResponse>
implements LiveStreamMediator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ static LiveStreamMediator providesLiveStreamMediator(
return LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();
}

/**
* Provides the subscription handler.
*
* @param blockNodeContext the block node context
* @param serviceStatus the service status
* @return the subscription handler
*/
@Provides
@Singleton
static SubscriptionHandler<SubscribeStreamResponse> provideSubscriptionHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@
/**
* Inherit from this class to leverage RingBuffer subscription handling.
*
* <p>Subclasses may use the ringBuffer to publish events to the subscribers.
* <p>Subclasses may use the ringBuffer to publish events to the subscribers. This base class
* contains the logic to manage subscriptions to the ring buffer.
*
* @param <V> the type of the subscription events
*/
public abstract class SubscriptionHandlerBase<V> implements SubscriptionHandler<V> {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final Map<BlockNodeEventHandler<ObjectEvent<V>>, BatchEventProcessor<ObjectEvent<V>>>
subscribers;

private final LongGauge subscriptionGauge;
/** The ring buffer to publish events to the subscribers. */
protected final RingBuffer<ObjectEvent<V>> ringBuffer;

private final LongGauge subscriptionGauge;
private final ExecutorService executor;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package com.hedera.block.server.notifier;

/** Use this interface contract to notify the implementation of critical system events. */
public interface Notifiable {

/**
* This method is called to notify of an unrecoverable error and the system will be shut down.
*/
void notifyUnrecoverableError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;

/**
* Use this interface to combine the contract for streaming block items with the contract to be
* notified of critical system events.
*/
public interface Notifier extends StreamMediator<BlockItem, PublishStreamResponse>, Notifiable {}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Use builder methods to create a {@link Notifier} to handle live stream response events from the
* persistence layer to N producers.
*
* <p>When a stream mediator is created, it will accept block item responses from the persistence
* layer and publish them to all producers subscribed to the stream
*/
public class NotifierBuilder {

private final Notifiable mediator;
Expand All @@ -51,6 +58,16 @@ private NotifierBuilder(
this.serviceStatus = serviceStatus;
}

/**
* Create a new instance of the builder using the minimum required parameters.
*
* @param mediator is required to provide notification of critical system events.
* @param blockNodeContext is required to provide metrics reporting mechanisms to the stream
* mediator.
* @param serviceStatus is required to provide the stream mediator with access to check the
* status of the server and to stop the web server if necessary.
* @return a new stream mediator builder configured with required parameters.
*/
@NonNull
public static NotifierBuilder newBuilder(
@NonNull final Notifiable mediator,
Expand All @@ -59,6 +76,13 @@ public static NotifierBuilder newBuilder(
return new NotifierBuilder(mediator, blockNodeContext, serviceStatus);
}

/**
* Use the build method to construct a notifier to handle live stream response events from the
* persistence layer to N producers.
*
* @return the notifier to handle live stream response events between they persistence layer
* producer and N producers.
*/
@NonNull
public Notifier build() {
return new NotifierImpl(subscribers, mediator, blockNodeContext, serviceStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@
import java.security.NoSuchAlgorithmException;
import java.util.Map;

/**
* Use NotifierImpl to mediate the stream of responses from the persistence layer back to multiple
* producers.
*
* <p>As an implementation of the StreamMediator interface, it proxies block item persistence
* responses back to the producers as they arrive via a RingBuffer maintained in the base class and
* persists the block items to a store. It also notifies the mediator of critical system events and
* will stop the server in the event of an unrecoverable error.
*/
class NotifierImpl extends SubscriptionHandlerBase<PublishStreamResponse> implements Notifier {

private final System.Logger LOGGER = System.getLogger(getClass().getName());
Expand Down Expand Up @@ -83,6 +92,12 @@ public void notifyUnrecoverableError() {
serviceStatus.stopWebServer(getClass().getName());
}

/**
* Publishes the given block item to all subscribed producers.
*
* @param blockItem the block item from the persistence layer to publish a response to upstream
* producers
*/
@Override
public void publish(@NonNull BlockItem blockItem) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
@Module
public interface NotifierInjectionModule {

/**
* Provides the notifier.
*
* @param streamMediator the stream mediator
* @param blockNodeContext the block node context
* @param serviceStatus the service status
* @return the notifier
*/
@Provides
@Singleton
static Notifier providesNotifier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ static BlockReader<Block> providesBlockReader(PersistenceStorageConfig config) {
return BlockAsDirReaderBuilder.newBuilder(config).build();
}

/**
* Binds the block node event handler to the stream persistence handler.
*
* @param streamPersistenceHandler the stream persistence handler
* @return the block node event handler
*/
@Binds
@Singleton
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>> bindBlockNodeEventHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface BlockWriter<V> {
* Write the block item to storage.
*
* @param blockItem the block item to write to storage.
* @return an optional containing the block item written to storage if the block item was a
* block proof signaling the end of the block, an empty optional otherwise.
* @throws IOException when failing to write the block item to storage.
*/
Optional<V> write(@NonNull final V blockItem) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class ProducerBlockItemObserver
* the upstream producer for each block item processed.
* @param blockNodeContext the block node context used to access context objects for the Block
* Node (e.g. - the metrics service).
* @param serviceStatus the service status used to stop the server in the event of an
* unrecoverable error.
*/
public ProducerBlockItemObserver(
@NonNull final InstantSource producerLivenessClock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ public class ServiceStatusImpl implements ServiceStatus {

private final int delayMillis;

/** Constructor for the ServiceStatusImpl class. */
/**
* Use the ServiceStatusImpl to check the status of the block node server and to shut it down if
* necessary.
*
* @param blockNodeContext the block node context
*/
@Inject
public ServiceStatusImpl(@NonNull final BlockNodeContext blockNodeContext) {
this.delayMillis =
Expand Down

0 comments on commit 410284c

Please sign in to comment.