Skip to content

Commit

Permalink
Turn on health monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <[email protected]>
  • Loading branch information
Austin Littley committed Jul 16, 2024
1 parent 2b65890 commit 28801bf
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.hedera.node.config.data.NettyConfig;
import com.hedera.node.config.data.NetworkAdminConfig;
import com.hedera.node.config.data.NodesConfig;
import com.hedera.node.config.data.PreHandleWorkflowConfig;
import com.hedera.node.config.data.RatesConfig;
import com.hedera.node.config.data.SchedulingConfig;
import com.hedera.node.config.data.SigsConfig;
Expand Down Expand Up @@ -128,7 +129,8 @@ public Set<Class<? extends Record>> getConfigDataTypes() {
TraceabilityConfig.class,
UpgradeConfig.class,
UtilPrngConfig.class,
VersionConfig.class);
VersionConfig.class,
PreHandleWorkflowConfig.class);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import com.hedera.node.app.workflows.dispatcher.TransactionDispatcher;
import com.hedera.node.config.ConfigProvider;
import com.hedera.node.config.VersionedConfiguration;
import com.hedera.node.config.data.PreHandleWorkflowConfig;
import com.swirlds.platform.system.events.Event;
import com.swirlds.platform.system.transaction.Transaction;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -95,6 +97,10 @@ public class PreHandleWorkflowImpl implements PreHandleWorkflow {
* Used for registering notice of transactionIDs seen by this node
*/
private final DeduplicationCache deduplicationCache;
/**
* Used to pre-handle transactions in parallel.
*/
private final ForkJoinPool preHandlePool;

/**
* Creates a new instance of {@code PreHandleWorkflowImpl}.
Expand All @@ -103,7 +109,6 @@ public class PreHandleWorkflowImpl implements PreHandleWorkflow {
* transaction.
* @param transactionChecker the {@link TransactionChecker} for parsing and verifying the transaction
* @param signatureVerifier the {@link SignatureVerifier} to verify signatures
* @throws NullPointerException if any of the parameters is {@code null}
*/
@Inject
public PreHandleWorkflowImpl(
Expand All @@ -119,6 +124,12 @@ public PreHandleWorkflowImpl(
this.signatureExpander = requireNonNull(signatureExpander);
this.configProvider = requireNonNull(configProvider);
this.deduplicationCache = requireNonNull(deduplicationCache);
final var config = configProvider.getConfiguration().getConfigData(PreHandleWorkflowConfig.class);
if (config.isCustomPoolEnabled()) {
preHandlePool = new ForkJoinPool(config.preHandleThreadCount());
} else {
preHandlePool = ForkJoinPool.commonPool();
}
}

/**
Expand All @@ -138,7 +149,7 @@ public void preHandle(
final var accountStore = readableStoreFactory.getStore(ReadableAccountStore.class);

// In parallel, we will pre-handle each transaction.
transactions.parallel().forEach(tx -> {
transactions.forEach(tx -> preHandlePool.execute(() -> {
if (tx.isSystem()) return;
try {
tx.setMetadata(preHandleTransaction(creator, readableStoreFactory, accountStore, tx));
Expand All @@ -150,7 +161,7 @@ public void preHandle(
"Possibly CATASTROPHIC failure while running the pre-handle workflow", unexpectedException);
tx.setMetadata(unknownFailure());
}
});
}));
}

// For each transaction, we will use a background thread to parse the transaction, validate it, lookup the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.config.data;

import com.hedera.node.config.NetworkProperty;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;

@ConfigData("preHandleWorkflow")
public record PreHandleWorkflowConfig(
@ConfigProperty(defaultValue = "true") @NetworkProperty boolean isCustomPoolEnabled,
@ConfigProperty(defaultValue = "8") @NetworkProperty int preHandleThreadCount) {}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.hedera.node.config.data.NettyConfig;
import com.hedera.node.config.data.NetworkAdminConfig;
import com.hedera.node.config.data.NodesConfig;
import com.hedera.node.config.data.PreHandleWorkflowConfig;
import com.hedera.node.config.data.RatesConfig;
import com.hedera.node.config.data.SchedulingConfig;
import com.hedera.node.config.data.SigsConfig;
Expand Down Expand Up @@ -189,6 +190,7 @@ public static TestConfigBuilder create() {
.withConfigDataType(UtilPrngConfig.class)
.withConfigDataType(VersionConfig.class)
.withConfigDataType(NodesConfig.class)
.withConfigDataType(PreHandleWorkflowConfig.class)
.withConverter(CongestionMultipliers.class, new CongestionMultipliersConverter())
.withConverter(EntityScaleFactors.class, new EntityScaleFactorsConverter())
.withConverter(KnownBlockValues.class, new KnownBlockValuesConverter())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
*/
@ConfigData("platform.wiring")
public record WiringConfig(
@ConfigProperty(defaultValue = "false") boolean healthMonitorEnabled,
@ConfigProperty(defaultValue = "true") boolean hardBackpressureEnabled,
@ConfigProperty(defaultValue = "1.0") double defaultPoolMultiplier,
@ConfigProperty(defaultValue = "0") int defaultPoolConstant,
@ConfigProperty(defaultValue = "true") boolean healthMonitorEnabled,
@ConfigProperty(defaultValue = "false") boolean hardBackpressureEnabled,
@ConfigProperty(defaultValue = "0") double defaultPoolMultiplier,
@ConfigProperty(defaultValue = "8") int defaultPoolConstant,
@ConfigProperty(defaultValue = "500") int healthMonitorSchedulerCapacity,
@ConfigProperty(defaultValue = "100ms") Duration healthMonitorHeartbeatPeriod,
@ConfigProperty(defaultValue = "5s") Duration healthLogThreshold,
@ConfigProperty(defaultValue = "1s") Duration healthLogThreshold,
@ConfigProperty(defaultValue = "10m") Duration healthLogPeriod) {}
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ public record EventCreationConfig(
@ConfigProperty(defaultValue = "10") double antiSelfishnessFactor,
@ConfigProperty(defaultValue = "10") int tipsetSnapshotHistorySize,
@ConfigProperty(defaultValue = "1024") int eventIntakeThrottle,
@ConfigProperty(defaultValue = "true") boolean useLegacyBackpressure,
@ConfigProperty(defaultValue = "5s") Duration maximumPermissibleUnhealthyDuration) {}
@ConfigProperty(defaultValue = "false") boolean useLegacyBackpressure,
@ConfigProperty(defaultValue = "1s") Duration maximumPermissibleUnhealthyDuration) {}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public record EventConfig(
@ConfigProperty(defaultValue = "/opt/hgcapp/eventsStreams") String eventsLogDir,
@ConfigProperty(defaultValue = "true") boolean enableEventStreaming,
@ConfigProperty(defaultValue = "false") boolean useBirthRoundAncientThreshold,
@ConfigProperty(defaultValue = "true") boolean useOldStyleIntakeQueue) {
@ConfigProperty(defaultValue = "false") boolean useOldStyleIntakeQueue) {

/**
* @return the {@link AncientMode} based on useBirthRoundAncientThreshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@
public record SyncConfig(
@ConfigProperty(defaultValue = "25") int syncSleepAfterFailedNegotiation,
@ConfigProperty(defaultValue = "17") int syncProtocolPermitCount,
@ConfigProperty(defaultValue = "false") boolean onePermitPerPeer,
@ConfigProperty(defaultValue = "true") boolean onePermitPerPeer,
@ConfigProperty(defaultValue = "1000") int syncProtocolHeartbeatPeriod,
@ConfigProperty(defaultValue = "true") boolean waitForEventsInIntake,
@ConfigProperty(defaultValue = "true") boolean filterLikelyDuplicates,
@ConfigProperty(defaultValue = "3s") Duration nonAncestorFilterThreshold,
@ConfigProperty(defaultValue = "500ms") Duration syncKeepalivePeriod,
@ConfigProperty(defaultValue = "1m") Duration maxSyncTime,
@ConfigProperty(defaultValue = "0") int maxSyncEventCount,
@ConfigProperty(defaultValue = "5s") Duration unhealthyGracePeriod,
@ConfigProperty(defaultValue = "5000") int maxSyncEventCount,
@ConfigProperty(defaultValue = "1s") Duration unhealthyGracePeriod,
@ConfigProperty(defaultValue = "5") double permitsRevokedPerSecond,
@ConfigProperty(defaultValue = "0.1") double permitsReturnedPerSecond,
@ConfigProperty(defaultValue = "1") int minimumHealthyUnrevokedPermitCount) {}
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@
@ConfigData("platformSchedulers")
public record PlatformSchedulersConfig(
@ConfigProperty(defaultValue = "500") int eventHasherUnhandledCapacity,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
@ConfigProperty(defaultValue = "CONCURRENT CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration internalEventValidator,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(5000) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration eventDeduplicator,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
@ConfigProperty(defaultValue = "CONCURRENT CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration eventSignatureValidator,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration orphanBuffer,
@ConfigProperty(
defaultValue =
"SEQUENTIAL_THREAD CAPACITY(500) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC")
TaskSchedulerConfiguration consensusEngine,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(5000) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC")
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration eventCreationManager,
@ConfigProperty(defaultValue = "DIRECT") TaskSchedulerConfiguration selfEventSigner,
@ConfigProperty(defaultValue = "SEQUENTIAL_THREAD CAPACITY(20) UNHANDLED_TASK_METRIC")
Expand All @@ -93,7 +93,7 @@ public record PlatformSchedulersConfig(
TaskSchedulerConfiguration stateSignatureCollector,
@ConfigProperty(
defaultValue =
"SEQUENTIAL_THREAD CAPACITY(5) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC")
"SEQUENTIAL_THREAD CAPACITY(30) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC")
TaskSchedulerConfiguration transactionHandler,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration issDetector,
Expand Down Expand Up @@ -126,12 +126,12 @@ public record PlatformSchedulersConfig(
@ConfigProperty(defaultValue = "DIRECT_THREADSAFE") TaskSchedulerConfiguration transactionPool,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration gossip,
@ConfigProperty(defaultValue = "CONCURRENT CAPACITY(-1) UNHANDLED_TASK_METRIC")
@ConfigProperty(defaultValue = "CONCURRENT CAPACITY(5000) UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration eventHasher,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(-1) UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration postHashCollector,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration branchDetector,
@ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC")
TaskSchedulerConfiguration branchReporter,
@ConfigProperty(defaultValue = "true") boolean hashCollectorEnabled) {}
@ConfigProperty(defaultValue = "false") boolean hashCollectorEnabled) {}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ void statusPreventsCreation() {
assertSame(eventsToCreate.get(1), e1);
}

/**
* This form of backpressure is not currently enabled.
*/
@Disabled
@Test
void backpressurePreventsCreation() {
final UnsignedEvent e0 = manager.maybeCreateEvent();
Expand Down Expand Up @@ -159,10 +163,6 @@ void ratePreventsCreation() {
assertSame(eventsToCreate.get(1), e1);
}

/**
* This form of backpressure is not currently enabled.
*/
@Disabled
@Test
void unhealthyNodePreventsCreation() {
final UnsignedEvent e0 = manager.maybeCreateEvent();
Expand Down

0 comments on commit 28801bf

Please sign in to comment.