Skip to content

Commit

Permalink
fix: detect post-upgrade txn in presence of pre-upgrade events (#15834)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Tinker <[email protected]>
  • Loading branch information
tinker-michaelj authored Oct 16, 2024
1 parent 73d8b92 commit b0cc342
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ private List<StateChanges.Builder> onMigrate(
migrationStateChanges.addAll(migrationChanges);
kvStateChangeListener.reset();
boundaryStateChangeListener.reset();
if (isUpgrade && !trigger.equals(RECONNECT)) {
// For specifically a non-genesis upgrade, set in state that post-upgrade work is pending
if (isUpgrade && trigger != RECONNECT && trigger != GENESIS) {
unmarkMigrationRecordsStreamed(state);
migrationStateChanges.add(
StateChanges.newBuilder().stateChanges(boundaryStateChangeListener.allStateChanges()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ public ResponseCodeEnum update(@NonNull final Bytes bytes) {
currentSchedule = FeeSchedule.DEFAULT;
}

// Populate the map of HederaFunctionality -> FeeData for the current schedule
this.currentFeeDataMap = new HashMap<>();
populateFeeDataMap(currentFeeDataMap, currentSchedule.transactionFeeSchedule());
// Populate the map of HederaFunctionality -> FeeData for the current schedule, but avoid mutating
// the active one in-place as other threads may be using it for ingest/query fee calculations
final var newCurrentFeeDataMap = new HashMap<Entry, FeeData>();
populateFeeDataMap(newCurrentFeeDataMap, currentSchedule.transactionFeeSchedule());
this.currentFeeDataMap = newCurrentFeeDataMap;

// Get the expiration time of the current schedule
if (currentSchedule.hasExpiryTime()) {
Expand All @@ -160,9 +162,11 @@ public ResponseCodeEnum update(@NonNull final Bytes bytes) {
logger.warn("Unable to parse next fee schedule, will default to the current fee schedule.");
nextFeeDataMap = new HashMap<>(currentFeeDataMap);
} else {
// Populate the map of HederaFunctionality -> FeeData for the current schedule
this.nextFeeDataMap = new HashMap<>();
populateFeeDataMap(nextFeeDataMap, nextSchedule.transactionFeeSchedule());
// Populate the map of HederaFunctionality -> FeeData for the next schedule, but avoid mutating
// the active one in-place as other threads may be using it for ingest/query fee calculations
final var newNextFeeDataMap = new HashMap<Entry, FeeData>();
populateFeeDataMap(newNextFeeDataMap, nextSchedule.transactionFeeSchedule());
this.nextFeeDataMap = newNextFeeDataMap;
}

return SUCCESS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,6 @@ public void advanceConsensusClock(@NonNull final Instant consensusTime, @NonNull
.consTimeOfLastHandledTxn(Timestamp.newBuilder()
.seconds(consensusTime.getEpochSecond())
.nanos(consensusTime.getNano()));
if (!this.lastBlockInfo.migrationRecordsStreamed()) {
// Any records created during migration should have been published already. Now we shut off the flag to
// disallow further publishing
builder.migrationRecordsStreamed(true);
}
final var newBlockInfo = builder.build();

// Update the latest block info in state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void migrate(@NonNull final MigrationContext ctx) {
final var isGenesis = ctx.previousVersion() == null;
if (isGenesis) {
final var blocksState = ctx.newStates().getSingleton(BLOCK_INFO_STATE_KEY);
final var blocks = new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, false, EPOCH);
// Note there is by convention no post-upgrade work to do if starting from genesis
final var blocks = new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, true, EPOCH);
blocksState.put(blocks);
final var runningHashState = ctx.newStates().getSingleton(RUNNING_HASHES_STATE_KEY);
final var runningHashes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
final var blockStreamConfig = userTxn.config().getConfigData(BlockStreamConfig.class);
try {
if (isOlderSoftwareEvent(userTxn)) {
advanceConsensusClock(userTxn, blockStreamConfig);
initializeBuilderInfo(userTxn.baseBuilder(), userTxn.txnInfo(), exchangeRateManager.exchangeRates())
.status(BUSY);
// Flushes the BUSY builder to the stream, no other side effects
Expand All @@ -367,16 +368,16 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
streamBuilder.exchangeRate(exchangeRateManager.exchangeRates());
userTxn.stack().commitTransaction(streamBuilder);
}
if (blockStreamConfig.streamRecords()) {
blockRecordManager.markMigrationRecordsStreamed();
}
// C.f. https://github.com/hashgraph/hedera-services/issues/14751,
// here we may need to switch the newly adopted candidate roster
// in the RosterService state to become the active roster
}
final var dispatch = dispatchFor(userTxn, blockStreamConfig);
updateNodeStakes(userTxn, dispatch);
if (blockStreamConfig.streamRecords()) {
blockRecordManager.advanceConsensusClock(userTxn.consensusNow(), userTxn.state());
}
expireSchedules(userTxn);
advanceConsensusClock(userTxn, blockStreamConfig);
logPreDispatch(userTxn);
if (userTxn.type() == GENESIS_TRANSACTION) {
systemSetup.doGenesisSetup(dispatch);
Expand Down Expand Up @@ -404,6 +405,20 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
}
}

/**
* Advances the consensus clock in state when streaming records; also expires any schedules.
* @param userTxn the user transaction
* @param blockStreamConfig the block stream configuration
*/
private void advanceConsensusClock(
@NonNull final UserTxn userTxn, @NonNull final BlockStreamConfig blockStreamConfig) {
if (blockStreamConfig.streamRecords()) {
// For POST_UPGRADE_TRANSACTION, also commits to state that the post-upgrade work is done
blockRecordManager.advanceConsensusClock(userTxn.consensusNow(), userTxn.state());
}
expireSchedules(userTxn);
}

/**
* Returns a stream of a single {@link ResponseCodeEnum#FAIL_INVALID} record
* for the given user transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.hedera.hapi.node.base.HederaFunctionality;
import com.hedera.hapi.node.base.Key;
import com.hedera.hapi.node.state.blockrecords.BlockInfo;
import com.hedera.hapi.platform.state.PlatformState;
import com.hedera.node.app.blocks.impl.BoundaryStateChangeListener;
import com.hedera.node.app.blocks.impl.KVStateChangeListener;
import com.hedera.node.app.fees.ExchangeRateManager;
Expand Down Expand Up @@ -69,8 +68,6 @@
import com.hedera.node.config.data.ConsensusConfig;
import com.hedera.node.config.data.HederaConfig;
import com.swirlds.config.api.Configuration;
import com.swirlds.platform.state.service.PlatformStateService;
import com.swirlds.platform.state.service.schemas.V0540PlatformStateSchema;
import com.swirlds.platform.system.events.ConsensusEvent;
import com.swirlds.platform.system.transaction.ConsensusTransaction;
import com.swirlds.state.State;
Expand Down Expand Up @@ -255,6 +252,7 @@ public Dispatch newDispatch(

/**
* Returns the base stream builder for this user transaction.
*
* @return the base stream builder
*/
public StreamBuilder baseBuilder() {
Expand All @@ -263,24 +261,14 @@ public StreamBuilder baseBuilder() {

/**
* Returns whether the given state indicates this transaction is the first after an upgrade.
*
* @param state the Hedera state
* @return whether the given state indicates this transaction is the first after an upgrade
*/
private static boolean isUpgradeBoundary(@NonNull final State state) {
final var platformState = state.getReadableStates(PlatformStateService.NAME)
.<PlatformState>getSingleton(V0540PlatformStateSchema.PLATFORM_STATE_KEY)
final var blockInfo = state.getReadableStates(BlockRecordService.NAME)
.<BlockInfo>getSingleton(BLOCK_INFO_STATE_KEY)
.get();
requireNonNull(platformState);
if (platformState.freezeTime() == null
|| !platformState.freezeTimeOrThrow().equals(platformState.lastFrozenTime())) {
return false;
} else {
// Check the state directly here instead of going through BlockManager to allow us
// to manipulate this condition easily in embedded tests
final var blockInfo = state.getReadableStates(BlockRecordService.NAME)
.<BlockInfo>getSingleton(BLOCK_INFO_STATE_KEY)
.get();
return !requireNonNull(blockInfo).migrationRecordsStreamed();
}
return !requireNonNull(blockInfo).migrationRecordsStreamed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void testRegisterSchemas() {
assertEquals(
new RunningHashes(GENESIS_HASH, Bytes.EMPTY, Bytes.EMPTY, Bytes.EMPTY),
runningHashesCapture.getValue());
assertEquals(new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, false, EPOCH), blockInfoCapture.getValue());
assertEquals(new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, true, EPOCH), blockInfoCapture.getValue());
} else {
assertThat(schema).isInstanceOf(V0560BlockRecordSchema.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private static class ProblemTracker {
List.of("Interrupted while waiting for signature verification"),
List.of("Could not start TLS server, will continue without it"),
List.of("Properties file", "does not exist and won't be used as configuration source"),
// Using a 1-minute staking period in CI can lead to periods with no transactions, breaking invariants
List.of("StakingRewardsHelper", "Pending rewards decreased"),
List.of("Throttle multiplier for CryptoTransfer throughput congestion has no throttle buckets"));

private int numProblems = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -567,6 +569,37 @@ public static WaitForStatusOp waitForFrozenNetwork(@NonNull final Duration timeo
return new WaitForStatusOp(NodeSelector.allNodes(), FREEZE_COMPLETE, timeout);
}

/**
* Returns an operation that initiates background traffic running until the target network's
* first node has reached {@link com.swirlds.platform.system.status.PlatformStatus#FREEZE_COMPLETE}.
* @return the operation
*/
public static SpecOperation runBackgroundTrafficUntilFreezeComplete() {
return withOpContext((spec, opLog) -> {
opLog.info("Starting background traffic until freeze complete");
final var stopTraffic = new AtomicBoolean();
CompletableFuture.runAsync(() -> {
while (!stopTraffic.get()) {
allRunFor(
spec,
cryptoTransfer(tinyBarsFromTo(GENESIS, STAKING_REWARD, 1))
.deferStatusResolution()
.hasAnyStatusAtAll()
.orUnavailableStatus()
.noLogging());
}
});
spec.targetNetworkOrThrow()
.nodes()
.getFirst()
.statusFuture(FREEZE_COMPLETE, (status) -> {})
.thenRun(() -> {
stopTraffic.set(true);
opLog.info("Stopping background traffic after freeze complete");
});
});
}

public static HapiSpecSleep sleepFor(long timeMs) {
return new HapiSpecSleep(timeMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static com.hedera.services.bdd.spec.dsl.operations.transactions.TouchBalancesOperation.touchBalanceOf;
import static com.hedera.services.bdd.spec.queries.QueryVerbs.getVersionInfo;
import static com.hedera.services.bdd.spec.transactions.TxnUtils.sysFileUpdateTo;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoCreate;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoTransfer;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.nodeCreate;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.nodeDelete;
Expand Down Expand Up @@ -145,9 +144,7 @@ final Stream<DynamicTest> sameNodesTest() {
prepareFakeUpgrade(),
validateUpgradeAddressBooks(DabEnabledUpgradeTest::hasClassicAddressMetadata),
upgradeToNextConfigVersion(),
assertExpectedConfigVersion(startVersion::get),
// Ensure we have a post-upgrade transaction to trigger system file exports
cryptoCreate("somebodyNew"));
assertExpectedConfigVersion(startVersion::get));
}
}

Expand Down Expand Up @@ -241,9 +238,7 @@ final Stream<DynamicTest> exportedAddressBookIncludesNodeId4() {
// node4 was not active before this the upgrade, so it could not have written a config.txt
validateUpgradeAddressBooks(exceptNodeIds(4L), addressBook -> assertThat(nodeIdsFrom(addressBook))
.contains(4L)),
upgradeToNextConfigVersion(FakeNmt.addNode(4L, DAB_GENERATED)),
// Ensure we have a post-upgrade transaction to trigger system file exports
cryptoCreate("somebodyNew"));
upgradeToNextConfigVersion(FakeNmt.addNode(4L, DAB_GENERATED)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

import static com.hedera.services.bdd.junit.hedera.MarkerFile.EXEC_IMMEDIATE_MF;
import static com.hedera.services.bdd.spec.queries.QueryVerbs.getVersionInfo;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoCreate;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoTransfer;
import static com.hedera.services.bdd.spec.transactions.crypto.HapiCryptoTransfer.tinyBarsFromTo;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.blockingOrder;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.buildUpgradeZipFrom;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doAdhoc;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doingContextual;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.freezeOnly;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.freezeUpgrade;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.noOp;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.prepareUpgrade;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.purgeUpgradeArtifacts;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.runBackgroundTrafficUntilFreezeComplete;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.sourcing;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.updateSpecialFile;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.waitForActive;
Expand Down Expand Up @@ -176,6 +179,7 @@ default SpecOperation upgradeToNextConfigVersion(@NonNull final SpecOperation...
default HapiSpecOperation upgradeToConfigVersion(final int version, @NonNull final SpecOperation... preRestartOps) {
requireNonNull(preRestartOps);
return blockingOrder(
runBackgroundTrafficUntilFreezeComplete(),
sourcing(() -> freezeUpgrade()
.startingIn(2)
.seconds()
Expand All @@ -185,7 +189,11 @@ default HapiSpecOperation upgradeToConfigVersion(final int version, @NonNull fin
blockingOrder(preRestartOps),
FakeNmt.restartNetwork(version),
doAdhoc(() -> CURRENT_CONFIG_VERSION.set(version)),
waitForActiveNetwork(RESTART_TIMEOUT));
waitForActiveNetwork(RESTART_TIMEOUT),
cryptoCreate("postUpgradeAccount"),
// Ensure we have a post-upgrade transaction in a new period to trigger
// system file exports while still streaming records
doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted));
}

/**
Expand Down

0 comments on commit b0cc342

Please sign in to comment.