Skip to content

Commit

Permalink
chore: create state signature collector component (#10838)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <[email protected]>
Signed-off-by: Cody Littley <[email protected]>
Co-authored-by: Cody Littley <[email protected]>
Co-authored-by: Maxi Tartaglia <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2024
1 parent 3467288 commit e54c29c
Show file tree
Hide file tree
Showing 52 changed files with 1,088 additions and 1,636 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@
import com.swirlds.platform.components.appcomm.AppCommunicationComponent;
import com.swirlds.platform.components.state.DefaultStateManagementComponent;
import com.swirlds.platform.components.state.StateManagementComponent;
import com.swirlds.platform.components.state.output.NewLatestCompleteStateConsumer;
import com.swirlds.platform.components.transaction.system.ConsensusSystemTransactionManager;
import com.swirlds.platform.components.transaction.system.PreconsensusSystemTransactionManager;
import com.swirlds.platform.config.ThreadConfig;
import com.swirlds.platform.consensus.ConsensusConfig;
import com.swirlds.platform.consensus.NonAncientEventWindow;
Expand Down Expand Up @@ -111,6 +109,7 @@
import com.swirlds.platform.gossip.shadowgraph.ShadowGraphEventObserver;
import com.swirlds.platform.gossip.sync.config.SyncConfig;
import com.swirlds.platform.gui.GuiPlatformAccessor;
import com.swirlds.platform.internal.ConsensusRound;
import com.swirlds.platform.internal.EventImpl;
import com.swirlds.platform.listeners.PlatformStatusChangeListener;
import com.swirlds.platform.listeners.PlatformStatusChangeNotification;
Expand Down Expand Up @@ -138,16 +137,17 @@
import com.swirlds.platform.state.iss.IssScratchpad;
import com.swirlds.platform.state.nexus.EmergencyStateNexus;
import com.swirlds.platform.state.nexus.LatestCompleteStateNexus;
import com.swirlds.platform.state.nexus.LockFreeStateNexus;
import com.swirlds.platform.state.nexus.SignedStateNexus;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SavedStateInfo;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.state.signed.SignedStateFileManager;
import com.swirlds.platform.state.signed.SignedStateManager;
import com.swirlds.platform.state.signed.SignedStateMetrics;
import com.swirlds.platform.state.signed.SourceOfSignedState;
import com.swirlds.platform.state.signed.StartupStateUtils;
import com.swirlds.platform.state.signed.StateDumpRequest;
import com.swirlds.platform.state.signed.StateSignatureCollector;
import com.swirlds.platform.state.signed.StateToDiskReason;
import com.swirlds.platform.stats.StatConstructor;
import com.swirlds.platform.system.InitTrigger;
Expand Down Expand Up @@ -181,6 +181,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -233,7 +234,7 @@ public class SwirldsPlatform implements Platform {
* NOTE: This is currently set when a state has finished hashing. In the future, this will be set at the moment a
* new state is created, before it is hashed.
*/
private final SignedStateNexus latestImmutableState = new SignedStateNexus();
private final SignedStateNexus latestImmutableState = new LockFreeStateNexus();

private final QueueThread<GossipEvent> intakeQueue;

Expand Down Expand Up @@ -524,32 +525,29 @@ public class SwirldsPlatform implements Platform {
swirldName);

transactionPool = new TransactionPool(platformContext);
final LatestCompleteStateNexus latestCompleteState =
new LatestCompleteStateNexus(stateConfig, platformContext.getMetrics());

// FUTURE WORK: at some point this should be part of the unified platform wiring
final WiringModel model = WiringModel.create(platformContext, Time.getCurrent());
components.add(model);

platformWiring = components.add(new PlatformWiring(platformContext, time));

final LatestCompleteStateNexus latestCompleteState =
new LatestCompleteStateNexus(stateConfig, platformContext.getMetrics());
savedStateController = new SavedStateController(stateConfig);
final NewLatestCompleteStateConsumer newLatestCompleteStateConsumer = ss -> {
// the app comm component will reserve the state, this should be done by the wiring in the future
appCommunicationComponent.newLatestCompleteStateEvent(ss);
// the nexus expects a state to be reserved for it
// in the future, all of these reservations will be done by the wiring
latestCompleteState.setState(ss.reserve("setting latest complete state"));
};

final SignedStateMetrics signedStateMetrics = new SignedStateMetrics(platformContext.getMetrics());
final StateSignatureCollector stateSignatureCollector = new StateSignatureCollector(
platformContext.getConfiguration().getConfigData(StateConfig.class), signedStateMetrics);

stateManagementComponent = new DefaultStateManagementComponent(
platformContext,
threadManager,
dispatchBuilder,
newLatestCompleteStateConsumer,
this::handleFatalError,
platformWiring.getSaveStateToDiskInput()::put,
platformWiring.getSignStateInput()::put);
platformWiring.getSignStateInput()::put,
platformWiring.getSignatureCollectorStateInput()::put,
signedStateMetrics);

final EventHasher eventHasher = new EventHasher(platformContext);
final StateSigner stateSigner = new StateSigner(new PlatformSigner(keysAndCerts), platformStatusManager);
Expand All @@ -574,22 +572,16 @@ public class SwirldsPlatform implements Platform {

components.add(stateManagementComponent);

final SignedStateManager signedStateManager = stateManagementComponent.getSignedStateManager();

final PreconsensusSystemTransactionManager preconsensusSystemTransactionManager =
new PreconsensusSystemTransactionManager();
preconsensusSystemTransactionManager.addHandler(
StateSignatureTransaction.class, signedStateManager::handlePreconsensusSignatureTransaction);

final ConsensusSystemTransactionManager consensusSystemTransactionManager =
new ConsensusSystemTransactionManager();
consensusSystemTransactionManager.addHandler(
StateSignatureTransaction.class,
(ignored, nodeId, txn, v) ->
consensusHashManager.handlePostconsensusSignatureTransaction(nodeId, txn, v));
consensusSystemTransactionManager.addHandler(
StateSignatureTransaction.class,
(ignored, nodeId, txn, v) -> signedStateManager.handlePostconsensusSignatureTransaction(nodeId, txn));
final BiConsumer<State, ConsensusRound> roundAndStateConsumer = (state, round) -> {
consensusSystemTransactionManager.handleRound(state, round);
platformWiring.getSignatureCollectorConsensusInput().put(round);
};

// FUTURE WORK remove this when there are no more ShutdownRequestedTriggers being dispatched
components.add(new Shutdown());
Expand Down Expand Up @@ -625,8 +617,7 @@ public class SwirldsPlatform implements Platform {
platformContext,
currentAddressBook,
selfId,
preconsensusSystemTransactionManager,
consensusSystemTransactionManager,
roundAndStateConsumer,
new SwirldStateMetrics(platformContext.getMetrics()),
platformStatusManager,
initialState.getState(),
Expand Down Expand Up @@ -729,7 +720,7 @@ public class SwirldsPlatform implements Platform {
eventCreationManager,
sequencer,
swirldStateManager,
signedStateManager);
stateSignatureCollector);

intakeHandler = platformWiring.getEventInput()::put;

Expand All @@ -743,7 +734,8 @@ public class SwirldsPlatform implements Platform {
.setMetricsConfiguration(new QueueThreadMetricsConfiguration(metrics).enableMaxSizeMetric())
.build());

platformWiring.wireExternalComponents(platformStatusManager, appCommunicationComponent, transactionPool);
platformWiring.wireExternalComponents(
platformStatusManager, appCommunicationComponent, transactionPool, latestCompleteState);

transactionSubmitter = new SwirldTransactionSubmitter(
platformStatusManager::getCurrentStatus,
Expand Down Expand Up @@ -834,9 +826,9 @@ public class SwirldsPlatform implements Platform {

// To be removed once the GUI component is better integrated with the platform.
GuiPlatformAccessor.getInstance().setShadowGraph(selfId, shadowGraph);
GuiPlatformAccessor.getInstance().setStateManagementComponent(selfId, stateManagementComponent);
GuiPlatformAccessor.getInstance().setConsensusReference(selfId, consensusRef);
GuiPlatformAccessor.getInstance().setLatestCompleteStateComponent(selfId, latestCompleteState);
GuiPlatformAccessor.getInstance().setLatestImmutableStateComponent(selfId, latestImmutableState);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import com.swirlds.common.threading.framework.config.QueueThreadConfiguration;
import com.swirlds.platform.components.PlatformComponent;
import com.swirlds.platform.components.state.output.IssConsumer;
import com.swirlds.platform.components.state.output.NewLatestCompleteStateConsumer;
import com.swirlds.platform.consensus.ConsensusConstants;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteListener;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteNotification;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.state.signed.StateSavingResult;
import com.swirlds.platform.stats.AverageAndMax;
import com.swirlds.platform.stats.AverageStat;
Expand All @@ -48,12 +47,14 @@
/**
* This component responsible for notifying the application of various platform events
*/
public class AppCommunicationComponent implements PlatformComponent, NewLatestCompleteStateConsumer, IssConsumer {
public class AppCommunicationComponent implements PlatformComponent, IssConsumer {
private static final Logger logger = LogManager.getLogger(AppCommunicationComponent.class);

private final NotificationEngine notificationEngine;
/** A queue thread that asynchronously invokes NewLatestCompleteStateConsumers */
private final QueueThread<ReservedSignedState> asyncLatestCompleteStateQueue;
/** The round of the latest state provided to the application */
private long latestStateProvidedRound = ConsensusConstants.ROUND_UNDEFINED;
/**
* The size of the queue holding tasks for
* {@link com.swirlds.platform.components.state.output.NewLatestCompleteStateConsumer}s
Expand Down Expand Up @@ -100,20 +101,17 @@ public void stateSavedToDisk(@NonNull final StateSavingResult stateSavingResult)
stateSavingResult.freezeState()));
}

@Override
public void newLatestCompleteStateEvent(@NonNull final SignedState signedState) {
// the state is reserved now before it is added to the queue
public void newLatestCompleteStateEvent(@NonNull final ReservedSignedState reservedSignedState) {
// the state is reserved by the caller
// it will be released by the notification engine after the app consumes it
// this is done by latestCompleteStateAppNotify()
// if the state does not make into the queue, it will be released below
final ReservedSignedState reservedSignedState =
signedState.reserve("AppCommunicationComponent newLatestCompleteStateEvent");
final boolean success = asyncLatestCompleteStateQueue.offer(reservedSignedState);
if (!success) {
logger.error(
EXCEPTION.getMarker(),
"Unable to add new latest complete state task (state round = {}) to {} because it is full",
signedState.getRound(),
reservedSignedState.get().getRound(),
asyncLatestCompleteStateQueue.getName());
reservedSignedState.close();
}
Expand All @@ -123,6 +121,12 @@ public void newLatestCompleteStateEvent(@NonNull final SignedState signedState)
* Handler for {@link #asyncLatestCompleteStateQueue}
*/
private void latestCompleteStateHandler(@NonNull final ReservedSignedState reservedSignedState) {
if (reservedSignedState.get().getRound() <= latestStateProvidedRound) {
// this state is older than the latest state provided to the application, no need to notify
reservedSignedState.close();
return;
}
latestStateProvidedRound = reservedSignedState.get().getRound();
final NewSignedStateNotification notification = new NewSignedStateNotification(
reservedSignedState.get().getSwirldState(),
reservedSignedState.get().getState().getPlatformState(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.components.common.output.FatalErrorConsumer;
import com.swirlds.platform.components.state.output.NewLatestCompleteStateConsumer;
import com.swirlds.platform.dispatch.DispatchBuilder;
import com.swirlds.platform.dispatch.triggers.flow.StateHashedTrigger;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.state.signed.SignedStateGarbageCollector;
import com.swirlds.platform.state.signed.SignedStateHasher;
import com.swirlds.platform.state.signed.SignedStateInfo;
import com.swirlds.platform.state.signed.SignedStateManager;
import com.swirlds.platform.state.signed.SignedStateMetrics;
import com.swirlds.platform.state.signed.SignedStateSentinel;
import com.swirlds.platform.state.signed.SourceOfSignedState;
import com.swirlds.platform.util.HashLogger;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

Expand All @@ -54,11 +50,6 @@ public class DefaultStateManagementComponent implements StateManagementComponent
*/
private final SignedStateHasher signedStateHasher;

/**
* Keeps track of various signed states in various stages of collecting signatures
*/
private final SignedStateManager signedStateManager;

/**
* A logger for hash stream data
*/
Expand All @@ -68,66 +59,46 @@ public class DefaultStateManagementComponent implements StateManagementComponent
* Used to track signed state leaks, if enabled
*/
private final SignedStateSentinel signedStateSentinel;

private final Consumer<ReservedSignedState> stateFileManager;

/** signs a state */
private final Consumer<ReservedSignedState> stateSigner;
/** collects signatures for a state */
private final Consumer<ReservedSignedState> sigCollector;

/**
* @param platformContext the platform context
* @param threadManager manages platform thread resources
* @param dispatchBuilder builds dispatchers. This is deprecated, do not wire new things together
* with this.
* @param newLatestCompleteStateConsumer consumer to invoke when there is a new latest complete signed state
* @param fatalErrorConsumer consumer to invoke when a fatal error has occurred
* @param stateFileManager writes states to disk
* @param platformContext the platform context
* @param threadManager manages platform thread resources
* @param dispatchBuilder builds dispatchers. This is deprecated, do not wire new things together with this.
* @param fatalErrorConsumer consumer to invoke when a fatal error has occurred
* @param stateSigner signs a state
* @param sigCollector collects signatures for a state
* @param signedStateMetrics metrics about signed states
*/
public DefaultStateManagementComponent(
@NonNull final PlatformContext platformContext,
@NonNull final ThreadManager threadManager,
@NonNull final DispatchBuilder dispatchBuilder,
@NonNull final NewLatestCompleteStateConsumer newLatestCompleteStateConsumer,
@NonNull final FatalErrorConsumer fatalErrorConsumer,
@NonNull final Consumer<ReservedSignedState> stateFileManager,
@NonNull final Consumer<ReservedSignedState> stateSigner) {
@NonNull final Consumer<ReservedSignedState> stateSigner,
@NonNull final Consumer<ReservedSignedState> sigCollector,
@NonNull final SignedStateMetrics signedStateMetrics) {

Objects.requireNonNull(platformContext);
Objects.requireNonNull(threadManager);
Objects.requireNonNull(newLatestCompleteStateConsumer);
Objects.requireNonNull(fatalErrorConsumer);

// Various metrics about signed states
final SignedStateMetrics signedStateMetrics = new SignedStateMetrics(platformContext.getMetrics());

this.signedStateGarbageCollector = new SignedStateGarbageCollector(threadManager, signedStateMetrics);
this.signedStateSentinel = new SignedStateSentinel(platformContext, threadManager, Time.getCurrent());
this.stateFileManager = Objects.requireNonNull(stateFileManager);
this.stateSigner = Objects.requireNonNull(stateSigner);
this.sigCollector = Objects.requireNonNull(sigCollector);

hashLogger =
new HashLogger(threadManager, platformContext.getConfiguration().getConfigData(StateConfig.class));

final StateHashedTrigger stateHashedTrigger =
dispatchBuilder.getDispatcher(this, StateHashedTrigger.class)::dispatch;
signedStateHasher = new SignedStateHasher(signedStateMetrics, stateHashedTrigger, fatalErrorConsumer);

signedStateManager = new SignedStateManager(
platformContext.getConfiguration().getConfigData(StateConfig.class),
signedStateMetrics,
newLatestCompleteStateConsumer,
this::signatureCollectionDone,
this::signatureCollectionDone);
}

/**
* Signature for a signed state is now done. We should save it to disk, if it should be saved. The state may or may
* not have all its signatures collected.
*
* @param signedState the newly complete signed state
*/
private void signatureCollectionDone(@NonNull final SignedState signedState) {
if (signedState.isStateToSave()) {
stateFileManager.accept(signedState.reserve("save to disk"));
}
}

private void logHashes(final SignedState signedState) {
Expand All @@ -146,26 +117,19 @@ public void newSignedStateFromTransactions(@NonNull final ReservedSignedState si

stateSigner.accept(signedState.getAndReserve("signing state from transactions"));

signedStateManager.addState(signedState.get());
sigCollector.accept(
signedState.getAndReserve("DefaultStateManagementComponent.newSignedStateFromTransactions"));
}
}

/**
* {@inheritDoc}
*/
@Override
public List<SignedStateInfo> getSignedStateInfo() {
return signedStateManager.getSignedStateInfo();
}

/**
* {@inheritDoc}
*/
@Override
public void stateToLoad(final SignedState signedState, final SourceOfSignedState sourceOfSignedState) {
signedState.setGarbageCollector(signedStateGarbageCollector);
logHashes(signedState);
signedStateManager.addState(signedState);
sigCollector.accept(signedState.reserve("DefaultStateManagementComponent.stateToLoad"));
}

/**
Expand All @@ -185,13 +149,4 @@ public void stop() {
signedStateSentinel.stop();
signedStateGarbageCollector.stop();
}

/**
* {@inheritDoc}
*/
@NonNull
@Override
public SignedStateManager getSignedStateManager() {
return signedStateManager;
}
}
Loading

0 comments on commit e54c29c

Please sign in to comment.