diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/config/ServicesConfigExtension.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/config/ServicesConfigExtension.java index 15022206e6d5..9aa224f8d45d 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/config/ServicesConfigExtension.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/config/ServicesConfigExtension.java @@ -60,6 +60,7 @@ import com.hedera.node.config.data.NettyConfig; import com.hedera.node.config.data.NetworkAdminConfig; import com.hedera.node.config.data.NodesConfig; +import com.hedera.node.config.data.PreHandleWorkflowConfig; import com.hedera.node.config.data.RatesConfig; import com.hedera.node.config.data.SchedulingConfig; import com.hedera.node.config.data.SigsConfig; @@ -128,7 +129,8 @@ public Set> getConfigDataTypes() { TraceabilityConfig.class, UpgradeConfig.class, UtilPrngConfig.class, - VersionConfig.class); + VersionConfig.class, + PreHandleWorkflowConfig.class); } @NonNull diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/workflows/prehandle/PreHandleWorkflowImpl.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/workflows/prehandle/PreHandleWorkflowImpl.java index 997e7da9a7d0..4e345be8e48f 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/workflows/prehandle/PreHandleWorkflowImpl.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/workflows/prehandle/PreHandleWorkflowImpl.java @@ -47,6 +47,7 @@ import com.hedera.node.app.workflows.dispatcher.TransactionDispatcher; import com.hedera.node.config.ConfigProvider; import com.hedera.node.config.VersionedConfiguration; +import com.hedera.node.config.data.PreHandleWorkflowConfig; import com.swirlds.platform.system.events.Event; import com.swirlds.platform.system.transaction.Transaction; import edu.umd.cs.findbugs.annotations.NonNull; @@ -54,6 +55,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Singleton; @@ -95,6 +97,10 @@ public class PreHandleWorkflowImpl implements PreHandleWorkflow { * Used for registering notice of transactionIDs seen by this node */ private final DeduplicationCache deduplicationCache; + /** + * Used to pre-handle transactions in parallel. + */ + private final ForkJoinPool preHandlePool; /** * Creates a new instance of {@code PreHandleWorkflowImpl}. @@ -103,7 +109,6 @@ public class PreHandleWorkflowImpl implements PreHandleWorkflow { * transaction. * @param transactionChecker the {@link TransactionChecker} for parsing and verifying the transaction * @param signatureVerifier the {@link SignatureVerifier} to verify signatures - * @throws NullPointerException if any of the parameters is {@code null} */ @Inject public PreHandleWorkflowImpl( @@ -119,6 +124,12 @@ public PreHandleWorkflowImpl( this.signatureExpander = requireNonNull(signatureExpander); this.configProvider = requireNonNull(configProvider); this.deduplicationCache = requireNonNull(deduplicationCache); + final var config = configProvider.getConfiguration().getConfigData(PreHandleWorkflowConfig.class); + if (config.isCustomPoolEnabled()) { + preHandlePool = new ForkJoinPool(config.preHandleThreadCount()); + } else { + preHandlePool = ForkJoinPool.commonPool(); + } } /** @@ -138,7 +149,7 @@ public void preHandle( final var accountStore = readableStoreFactory.getStore(ReadableAccountStore.class); // In parallel, we will pre-handle each transaction. - transactions.parallel().forEach(tx -> { + transactions.forEach(tx -> preHandlePool.execute(() -> { if (tx.isSystem()) return; try { tx.setMetadata(preHandleTransaction(creator, readableStoreFactory, accountStore, tx)); @@ -150,7 +161,7 @@ public void preHandle( "Possibly CATASTROPHIC failure while running the pre-handle workflow", unexpectedException); tx.setMetadata(unknownFailure()); } - }); + })); } // For each transaction, we will use a background thread to parse the transaction, validate it, lookup the diff --git a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/PreHandleWorkflowConfig.java b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/PreHandleWorkflowConfig.java new file mode 100644 index 000000000000..6b8ace0863ff --- /dev/null +++ b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/PreHandleWorkflowConfig.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.node.config.data; + +import com.hedera.node.config.NetworkProperty; +import com.swirlds.config.api.ConfigData; +import com.swirlds.config.api.ConfigProperty; + +@ConfigData("preHandleWorkflow") +public record PreHandleWorkflowConfig( + @ConfigProperty(defaultValue = "true") @NetworkProperty boolean isCustomPoolEnabled, + @ConfigProperty(defaultValue = "8") @NetworkProperty int preHandleThreadCount) {} diff --git a/hedera-node/hedera-config/src/testFixtures/java/com/hedera/node/config/testfixtures/HederaTestConfigBuilder.java b/hedera-node/hedera-config/src/testFixtures/java/com/hedera/node/config/testfixtures/HederaTestConfigBuilder.java index fb878ffebc57..de5f9631d146 100644 --- a/hedera-node/hedera-config/src/testFixtures/java/com/hedera/node/config/testfixtures/HederaTestConfigBuilder.java +++ b/hedera-node/hedera-config/src/testFixtures/java/com/hedera/node/config/testfixtures/HederaTestConfigBuilder.java @@ -61,6 +61,7 @@ import com.hedera.node.config.data.NettyConfig; import com.hedera.node.config.data.NetworkAdminConfig; import com.hedera.node.config.data.NodesConfig; +import com.hedera.node.config.data.PreHandleWorkflowConfig; import com.hedera.node.config.data.RatesConfig; import com.hedera.node.config.data.SchedulingConfig; import com.hedera.node.config.data.SigsConfig; @@ -189,6 +190,7 @@ public static TestConfigBuilder create() { .withConfigDataType(UtilPrngConfig.class) .withConfigDataType(VersionConfig.class) .withConfigDataType(NodesConfig.class) + .withConfigDataType(PreHandleWorkflowConfig.class) .withConverter(CongestionMultipliers.class, new CongestionMultipliersConverter()) .withConverter(EntityScaleFactors.class, new EntityScaleFactorsConverter()) .withConverter(KnownBlockValues.class, new KnownBlockValuesConverter()) diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringConfig.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringConfig.java index 180418dc326f..c8f62c815303 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringConfig.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringConfig.java @@ -41,11 +41,11 @@ */ @ConfigData("platform.wiring") public record WiringConfig( - @ConfigProperty(defaultValue = "false") boolean healthMonitorEnabled, - @ConfigProperty(defaultValue = "true") boolean hardBackpressureEnabled, - @ConfigProperty(defaultValue = "1.0") double defaultPoolMultiplier, - @ConfigProperty(defaultValue = "0") int defaultPoolConstant, + @ConfigProperty(defaultValue = "true") boolean healthMonitorEnabled, + @ConfigProperty(defaultValue = "false") boolean hardBackpressureEnabled, + @ConfigProperty(defaultValue = "0") double defaultPoolMultiplier, + @ConfigProperty(defaultValue = "8") int defaultPoolConstant, @ConfigProperty(defaultValue = "500") int healthMonitorSchedulerCapacity, @ConfigProperty(defaultValue = "100ms") Duration healthMonitorHeartbeatPeriod, - @ConfigProperty(defaultValue = "5s") Duration healthLogThreshold, + @ConfigProperty(defaultValue = "1s") Duration healthLogThreshold, @ConfigProperty(defaultValue = "10m") Duration healthLogPeriod) {} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/creation/EventCreationConfig.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/creation/EventCreationConfig.java index a97b35cda929..5457c70203ac 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/creation/EventCreationConfig.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/creation/EventCreationConfig.java @@ -54,5 +54,5 @@ public record EventCreationConfig( @ConfigProperty(defaultValue = "10") double antiSelfishnessFactor, @ConfigProperty(defaultValue = "10") int tipsetSnapshotHistorySize, @ConfigProperty(defaultValue = "1024") int eventIntakeThrottle, - @ConfigProperty(defaultValue = "true") boolean useLegacyBackpressure, - @ConfigProperty(defaultValue = "5s") Duration maximumPermissibleUnhealthyDuration) {} + @ConfigProperty(defaultValue = "false") boolean useLegacyBackpressure, + @ConfigProperty(defaultValue = "1s") Duration maximumPermissibleUnhealthyDuration) {} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/EventConfig.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/EventConfig.java index 7b58ebf4fc9f..2d731fab9117 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/EventConfig.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/EventConfig.java @@ -44,7 +44,7 @@ public record EventConfig( @ConfigProperty(defaultValue = "/opt/hgcapp/eventsStreams") String eventsLogDir, @ConfigProperty(defaultValue = "true") boolean enableEventStreaming, @ConfigProperty(defaultValue = "false") boolean useBirthRoundAncientThreshold, - @ConfigProperty(defaultValue = "true") boolean useOldStyleIntakeQueue) { + @ConfigProperty(defaultValue = "false") boolean useOldStyleIntakeQueue) { /** * @return the {@link AncientMode} based on useBirthRoundAncientThreshold diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/config/SyncConfig.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/config/SyncConfig.java index 0bfa8a3c7ae1..289edbb96d4d 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/config/SyncConfig.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/config/SyncConfig.java @@ -55,15 +55,15 @@ public record SyncConfig( @ConfigProperty(defaultValue = "25") int syncSleepAfterFailedNegotiation, @ConfigProperty(defaultValue = "17") int syncProtocolPermitCount, - @ConfigProperty(defaultValue = "false") boolean onePermitPerPeer, + @ConfigProperty(defaultValue = "true") boolean onePermitPerPeer, @ConfigProperty(defaultValue = "1000") int syncProtocolHeartbeatPeriod, @ConfigProperty(defaultValue = "true") boolean waitForEventsInIntake, @ConfigProperty(defaultValue = "true") boolean filterLikelyDuplicates, @ConfigProperty(defaultValue = "3s") Duration nonAncestorFilterThreshold, @ConfigProperty(defaultValue = "500ms") Duration syncKeepalivePeriod, @ConfigProperty(defaultValue = "1m") Duration maxSyncTime, - @ConfigProperty(defaultValue = "0") int maxSyncEventCount, - @ConfigProperty(defaultValue = "5s") Duration unhealthyGracePeriod, + @ConfigProperty(defaultValue = "5000") int maxSyncEventCount, + @ConfigProperty(defaultValue = "1s") Duration unhealthyGracePeriod, @ConfigProperty(defaultValue = "5") double permitsRevokedPerSecond, @ConfigProperty(defaultValue = "0.1") double permitsReturnedPerSecond, @ConfigProperty(defaultValue = "1") int minimumHealthyUnrevokedPermitCount) {} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java index 4591e549e550..e360d16d21ed 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java @@ -65,11 +65,11 @@ @ConfigData("platformSchedulers") public record PlatformSchedulersConfig( @ConfigProperty(defaultValue = "500") int eventHasherUnhandledCapacity, - @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") + @ConfigProperty(defaultValue = "CONCURRENT CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration internalEventValidator, - @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") + @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(5000) FLUSHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration eventDeduplicator, - @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") + @ConfigProperty(defaultValue = "CONCURRENT CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration eventSignatureValidator, @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration orphanBuffer, @@ -77,7 +77,7 @@ public record PlatformSchedulersConfig( defaultValue = "SEQUENTIAL_THREAD CAPACITY(500) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC") TaskSchedulerConfiguration consensusEngine, - @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(5000) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC") + @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration eventCreationManager, @ConfigProperty(defaultValue = "DIRECT") TaskSchedulerConfiguration selfEventSigner, @ConfigProperty(defaultValue = "SEQUENTIAL_THREAD CAPACITY(20) UNHANDLED_TASK_METRIC") @@ -93,7 +93,7 @@ public record PlatformSchedulersConfig( TaskSchedulerConfiguration stateSignatureCollector, @ConfigProperty( defaultValue = - "SEQUENTIAL_THREAD CAPACITY(5) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC") + "SEQUENTIAL_THREAD CAPACITY(30) FLUSHABLE SQUELCHABLE UNHANDLED_TASK_METRIC BUSY_FRACTION_METRIC") TaskSchedulerConfiguration transactionHandler, @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration issDetector, @@ -126,7 +126,7 @@ public record PlatformSchedulersConfig( @ConfigProperty(defaultValue = "DIRECT_THREADSAFE") TaskSchedulerConfiguration transactionPool, @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration gossip, - @ConfigProperty(defaultValue = "CONCURRENT CAPACITY(-1) UNHANDLED_TASK_METRIC") + @ConfigProperty(defaultValue = "CONCURRENT CAPACITY(5000) UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration eventHasher, @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(-1) UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration postHashCollector, @@ -134,4 +134,4 @@ public record PlatformSchedulersConfig( TaskSchedulerConfiguration branchDetector, @ConfigProperty(defaultValue = "SEQUENTIAL CAPACITY(500) FLUSHABLE UNHANDLED_TASK_METRIC") TaskSchedulerConfiguration branchReporter, - @ConfigProperty(defaultValue = "true") boolean hashCollectorEnabled) {} + @ConfigProperty(defaultValue = "false") boolean hashCollectorEnabled) {} diff --git a/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/tipset/EventCreationManagerTests.java b/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/tipset/EventCreationManagerTests.java index cacc5c40b516..9460c147d544 100644 --- a/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/tipset/EventCreationManagerTests.java +++ b/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/tipset/EventCreationManagerTests.java @@ -116,6 +116,10 @@ void statusPreventsCreation() { assertSame(eventsToCreate.get(1), e1); } + /** + * This form of backpressure is not currently enabled. + */ + @Disabled @Test void backpressurePreventsCreation() { final UnsignedEvent e0 = manager.maybeCreateEvent(); @@ -159,10 +163,6 @@ void ratePreventsCreation() { assertSame(eventsToCreate.get(1), e1); } - /** - * This form of backpressure is not currently enabled. - */ - @Disabled @Test void unhealthyNodePreventsCreation() { final UnsignedEvent e0 = manager.maybeCreateEvent();