Skip to content

Commit

Permalink
feat: Various block stream modifications (#15233)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Hess <[email protected]>
  • Loading branch information
mhess-swl authored Aug 29, 2024
1 parent dc89d1c commit f9f93bd
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public interface DeleteCapableTransactionStreamBuilder extends StreamBuilder {
AccountID getDeletedAccountBeneficiaryFor(@NonNull final AccountID deletedAccountID);

/**
* Adds a beneficiary for a deleted account.
* Adds a beneficiary for a deleted account into the map. This is needed while computing staking rewards.
* If the deleted account receives staking reward, it is transferred to the beneficiary.
*
* @param deletedAccountID the deleted account ID
* @param beneficiaryForDeletedAccount the beneficiary account ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,21 @@ public static Bytes lastBlockHash(@NonNull final BlockInfo blockInfo) {
*/
@Nullable
public static Bytes blockHashByBlockNumber(@NonNull final BlockInfo blockInfo, final long blockNo) {
final var blockHashes = blockInfo.blockHashes();
return blockHashByBlockNumber(blockInfo.blockHashes(), blockInfo.lastBlockNumber(), blockNo);
}

/**
* Given a concatenated sequence of 48-byte block hashes, where the rightmost hash was
* for the given last block number, returns either the hash of the block at the given
* block number, or null if the block number is out of range.
*
* @param blockHashes the concatenated sequence of block hashes
* @param lastBlockNo the block number of the rightmost hash in the sequence
* @param blockNo the block number of the hash to return
* @return the hash of the block at the given block number if available, null otherwise
*/
public static @Nullable Bytes blockHashByBlockNumber(
@NonNull final Bytes blockHashes, final long lastBlockNo, final long blockNo) {
final var blocksAvailable = blockHashes.length() / HASH_SIZE;

// Smart contracts (and other services) call this API. Should a smart contract call this, we don't really
Expand All @@ -77,8 +91,6 @@ public static Bytes blockHashByBlockNumber(@NonNull final BlockInfo blockInfo, f
if (blockNo < 0) {
return null;
}

final var lastBlockNo = blockInfo.lastBlockNumber();
final var firstAvailableBlockNo = lastBlockNo - blocksAvailable + 1;
// If blocksAvailable == 0, then firstAvailable == blockNo; and all numbers are
// either less than or greater than or equal to blockNo, so we return unavailable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Objects.requireNonNull;

import com.hedera.node.app.roster.schemas.V0540RosterSchema;
import com.swirlds.common.RosterStateId;
import com.swirlds.state.spi.SchemaRegistry;
import com.swirlds.state.spi.Service;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -30,10 +29,13 @@
* Not exposed outside `hedera-app`.
*/
public class RosterServiceImpl implements Service {
/** The name of this service */
public static final String NAME = "RosterService";

@NonNull
@Override
public String getServiceName() {
return RosterStateId.NAME;
return NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.hedera.hapi.node.state.primitives.ProtoBytes;
import com.hedera.hapi.node.state.roster.Roster;
import com.hedera.hapi.node.state.roster.RosterState;
import com.swirlds.common.RosterStateId;
import com.swirlds.state.spi.MigrationContext;
import com.swirlds.state.spi.Schema;
import com.swirlds.state.spi.StateDefinition;
Expand All @@ -34,6 +33,8 @@
*/
public class V0540RosterSchema extends Schema {
private static final Logger log = LogManager.getLogger(V0540RosterSchema.class);
public static final String ROSTER_KEY = "ROSTERS";
public static final String ROSTER_STATES_KEY = "ROSTER_STATE";
/** this can't be increased later so we pick some number large enough, 2^16. */
private static final long MAX_ROSTERS = 65_536L;

Expand All @@ -54,13 +55,13 @@ public V0540RosterSchema() {
@Override
public Set<StateDefinition> statesToCreate() {
return Set.of(
StateDefinition.singleton(RosterStateId.ROSTER_STATES_KEY, RosterState.PROTOBUF),
StateDefinition.onDisk(RosterStateId.ROSTER_KEY, ProtoBytes.PROTOBUF, Roster.PROTOBUF, MAX_ROSTERS));
StateDefinition.singleton(ROSTER_STATES_KEY, RosterState.PROTOBUF),
StateDefinition.onDisk(ROSTER_KEY, ProtoBytes.PROTOBUF, Roster.PROTOBUF, MAX_ROSTERS));
}

@Override
public void migrate(@NonNull final MigrationContext ctx) {
final var rosterState = ctx.newStates().getSingleton(RosterStateId.ROSTER_STATES_KEY);
final var rosterState = ctx.newStates().getSingleton(ROSTER_STATES_KEY);
if (rosterState.get() == null) {
log.info("Creating default roster state");
rosterState.put(RosterState.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*/
/*@ThreadSafe*/
public interface HederaRecordCache extends RecordCache {
/*
/**
* Records the fact that the given {@link TransactionID} has been seen by the given node. If the node has already
* been seen, then this call is a no-op. This call does not perform any additional validation of the transaction ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,14 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
initializeBuilderInfo(userTxn.baseBuilder(), userTxn.txnInfo(), exchangeRateManager.exchangeRates())
.status(BUSY);
// Flushes the BUSY builder to the stream, no other side effects
userTxn.stack().commitFullStack();
userTxn.stack().commitTransaction(userTxn.baseBuilder());
} else {
if (userTxn.type() == GENESIS_TRANSACTION) {
// (FUTURE) Once all genesis setup is done via dispatch, remove this method
systemSetup.externalizeInitSideEffects(userTxn.tokenContextImpl());
systemSetup.externalizeInitSideEffects(
userTxn.tokenContextImpl(), exchangeRateManager.exchangeRates());
}
updateNodeStakes(userTxn);

final var streamsRecords = configProvider
.getConfiguration()
.getConfigData(BlockStreamConfig.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.hedera.hapi.node.state.common.EntityNumber;
import com.hedera.hapi.node.state.token.Account;
import com.hedera.hapi.node.token.CryptoCreateTransactionBody;
import com.hedera.hapi.node.transaction.ExchangeRateSet;
import com.hedera.hapi.node.transaction.TransactionBody;
import com.hedera.node.app.ids.EntityIdService;
import com.hedera.node.app.service.addressbook.ReadableNodeStore;
Expand Down Expand Up @@ -123,7 +124,6 @@ public SystemSetup(
public void doGenesisSetup(@NonNull final Dispatch dispatch) {
final var systemContext = systemContextFor(dispatch);
fileService.createSystemEntities(systemContext);
dispatch.stack().commitFullStack();
}

/**
Expand Down Expand Up @@ -249,6 +249,7 @@ public void dispatchCreation(@NonNull final TransactionBody txBody, final long e
recordBuilder.status());
}
controlledNum.put(new EntityNumber(firstUserNum - 1));
dispatch.stack().commitSystemStateChanges();
}

@Override
Expand Down Expand Up @@ -285,12 +286,13 @@ public Instant now() {
* Called only once, before handling the first transaction in network history. Externalizes
* side effects of genesis setup done in
* {@link com.swirlds.platform.system.SwirldState#init(Platform, InitTrigger, SoftwareVersion)}.
* <p>
* Should be removed once
*
* @throws NullPointerException if called more than once
*/
public void externalizeInitSideEffects(@NonNull final TokenContext context) {
public void externalizeInitSideEffects(
@NonNull final TokenContext context, @NonNull final ExchangeRateSet exchangeRateSet) {
requireNonNull(context);
requireNonNull(exchangeRateSet);
final var firstConsensusTime = context.consensusTime();
log.info("Doing genesis setup at {}", firstConsensusTime);
// The account creator registers all its synthetics accounts based on the
Expand All @@ -305,32 +307,33 @@ public void externalizeInitSideEffects(@NonNull final TokenContext context) {
this::blocklistAccounts);

if (!systemAccounts.isEmpty()) {
createAccountRecordBuilders(systemAccounts, context, SYSTEM_ACCOUNT_CREATION_MEMO);
createAccountRecordBuilders(systemAccounts, context, SYSTEM_ACCOUNT_CREATION_MEMO, exchangeRateSet);
log.info(" - Queued {} system account records", systemAccounts.size());
systemAccounts = null;
}

if (!treasuryClones.isEmpty()) {
createAccountRecordBuilders(treasuryClones, context, TREASURY_CLONE_MEMO, exchangeRateSet);
log.info("Queued {} treasury clone account records", treasuryClones.size());
treasuryClones = null;
}

if (!stakingAccounts.isEmpty()) {
final var implicitAutoRenewPeriod = FUNDING_ACCOUNT_EXPIRY - firstConsensusTime.getEpochSecond();
createAccountRecordBuilders(stakingAccounts, context, STAKING_MEMO, implicitAutoRenewPeriod);
createAccountRecordBuilders(
stakingAccounts, context, STAKING_MEMO, implicitAutoRenewPeriod, exchangeRateSet);
log.info(" - Queued {} staking account records", stakingAccounts.size());
stakingAccounts = null;
}

if (!miscAccounts.isEmpty()) {
createAccountRecordBuilders(miscAccounts, context, null);
createAccountRecordBuilders(miscAccounts, context, null, exchangeRateSet);
log.info("Queued {} misc account records", miscAccounts.size());
miscAccounts = null;
}

if (!treasuryClones.isEmpty()) {
createAccountRecordBuilders(treasuryClones, context, TREASURY_CLONE_MEMO);
log.info("Queued {} treasury clone account records", treasuryClones.size());
treasuryClones = null;
}

if (!blocklistAccounts.isEmpty()) {
createAccountRecordBuilders(blocklistAccounts, context, null);
createAccountRecordBuilders(blocklistAccounts, context, null, exchangeRateSet);
log.info("Queued {} blocklist account records", blocklistAccounts.size());
blocklistAccounts = null;
}
Expand Down Expand Up @@ -359,20 +362,22 @@ private void blocklistAccounts(@NonNull final SortedSet<Account> accounts) {
private void createAccountRecordBuilders(
@NonNull final SortedSet<Account> map,
@NonNull final TokenContext context,
@Nullable final String recordMemo) {
createAccountRecordBuilders(map, context, recordMemo, null);
@Nullable final String recordMemo,
@NonNull final ExchangeRateSet exchangeRateSet) {
createAccountRecordBuilders(map, context, recordMemo, null, exchangeRateSet);
}

private void createAccountRecordBuilders(
@NonNull final SortedSet<Account> accts,
@NonNull final TokenContext context,
@Nullable final String recordMemo,
@Nullable final Long overrideAutoRenewPeriod) {
@Nullable final Long overrideAutoRenewPeriod,
@NonNull final ExchangeRateSet exchangeRateSet) {
for (final Account account : accts) {
// Since this is only called at genesis, the active savepoint's preceding record capacity will be
// Integer.MAX_VALUE and this will never fail with MAX_CHILD_RECORDS_EXCEEDED (c.f., HandleWorkflow)
final var recordBuilder = context.addPrecedingChildRecordBuilder(GenesisAccountStreamBuilder.class);
recordBuilder.accountID(account.accountId());
recordBuilder.accountID(account.accountIdOrThrow()).exchangeRate(exchangeRateSet);
if (recordMemo != null) {
recordBuilder.memo(recordMemo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,24 @@ public void process(
}
if (shouldExport) {
try {
// handle staking updates
stakingCalculator.updateNodes(tokenContext);
stack.commitFullStack();
// Update the exchange rate
exchangeRateManager.updateMidnightRates(stack);
stack.commitSystemStateChanges();
} catch (final Exception e) {
// If anything goes wrong, we log the error and continue
logger.error("CATASTROPHIC failure updating end-of-day stakes", e);
logger.error("CATASTROPHIC failure updating midnight rates", e);
stack.rollbackFullStack();
}

try {
// Update the exchange rate
exchangeRateManager.updateMidnightRates(stack);
stack.commitFullStack();
// handle staking updates
final var streamBuilder =
stakingCalculator.updateNodes(tokenContext, exchangeRateManager.exchangeRates());
if (streamBuilder != null) {
stack.commitTransaction(streamBuilder);
}
} catch (final Exception e) {
// If anything goes wrong, we log the error and continue
logger.error("CATASTROPHIC failure updating midnight rates", e);
logger.error("CATASTROPHIC failure updating end-of-day stakes", e);
stack.rollbackFullStack();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@

package com.hedera.node.app.blocks.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.hedera.hapi.block.stream.output.StateIdentifier;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class BlockImplUtilsTest {
@Test
Expand Down Expand Up @@ -62,4 +70,60 @@ void testCombineWithNull() {
assertThrows(NullPointerException.class, () -> BlockImplUtils.combine(null, new byte[0]));
assertThrows(NullPointerException.class, () -> BlockImplUtils.combine(new byte[0], null));
}

@ParameterizedTest
@MethodSource("stateIdsByName")
void stateIdsByNameAsExpected(@NonNull final String stateName, @NonNull final StateIdentifier stateId) {
final var parts = stateName.split("\\.");
assertThat(BlockImplUtils.stateIdFor(parts[0], parts[1])).isEqualTo(stateId.protoOrdinal());
}

public static Stream<Arguments> stateIdsByName() {
return Arrays.stream(StateIdentifier.values()).map(stateId -> Arguments.of(nameOf(stateId), stateId));
}

private static String nameOf(@NonNull final StateIdentifier stateId) {
return switch (stateId) {
case STATE_ID_NODES -> "AddressBookService.NODES";
case STATE_ID_BLOCK_INFO -> "BlockRecordService.BLOCKS";
case STATE_ID_RUNNING_HASHES -> "BlockRecordService.RUNNING_HASHES";
case STATE_ID_BLOCK_STREAM_INFO -> "BlockStreamService.BLOCK_STREAM_INFO";
case STATE_ID_CONGESTION_STARTS -> "CongestionThrottleService.CONGESTION_LEVEL_STARTS";
case STATE_ID_THROTTLE_USAGE -> "CongestionThrottleService.THROTTLE_USAGE_SNAPSHOTS";
case STATE_ID_TOPICS -> "ConsensusService.TOPICS";
case STATE_ID_CONTRACT_BYTECODE -> "ContractService.BYTECODE";
case STATE_ID_CONTRACT_STORAGE -> "ContractService.STORAGE";
case STATE_ID_ENTITY_ID -> "EntityIdService.ENTITY_ID";
case STATE_ID_MIDNIGHT_RATES -> "FeeService.MIDNIGHT_RATES";
case STATE_ID_FILES -> "FileService.FILES";
case STATE_ID_UPGRADE_DATA_150 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=150]]";
case STATE_ID_UPGRADE_DATA_151 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=151]]";
case STATE_ID_UPGRADE_DATA_152 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=152]]";
case STATE_ID_UPGRADE_DATA_153 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=153]]";
case STATE_ID_UPGRADE_DATA_154 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=154]]";
case STATE_ID_UPGRADE_DATA_155 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=155]]";
case STATE_ID_UPGRADE_DATA_156 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=156]]";
case STATE_ID_UPGRADE_DATA_157 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=157]]";
case STATE_ID_UPGRADE_DATA_158 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=158]]";
case STATE_ID_UPGRADE_DATA_159 -> "FileService.UPGRADE_DATA[FileID[shardNum=0, realmNum=0, fileNum=159]]";
case STATE_ID_UPGRADE_FILE -> "FileService.UPGRADE_FILE";
case STATE_ID_FREEZE_TIME -> "FreezeService.FREEZE_TIME";
case STATE_ID_UPGRADE_FILE_HASH -> "FreezeService.UPGRADE_FILE_HASH";
case STATE_ID_PLATFORM_STATE -> "PlatformStateService.PLATFORM_STATE";
case STATE_ID_ROSTER_STATE -> "RosterService.ROSTER_STATE";
case STATE_ID_ROSTERS -> "RosterService.ROSTERS";
case STATE_ID_TRANSACTION_RECEIPTS_QUEUE -> "RecordCache.TransactionReceiptQueue";
case STATE_ID_SCHEDULES_BY_EQUALITY -> "ScheduleService.SCHEDULES_BY_EQUALITY";
case STATE_ID_SCHEDULES_BY_EXPIRY -> "ScheduleService.SCHEDULES_BY_EXPIRY_SEC";
case STATE_ID_SCHEDULES_BY_ID -> "ScheduleService.SCHEDULES_BY_ID";
case STATE_ID_ACCOUNTS -> "TokenService.ACCOUNTS";
case STATE_ID_ALIASES -> "TokenService.ALIASES";
case STATE_ID_NFTS -> "TokenService.NFTS";
case STATE_ID_PENDING_AIRDROPS -> "TokenService.PENDING_AIRDROPS";
case STATE_ID_STAKING_INFO -> "TokenService.STAKING_INFOS";
case STATE_ID_NETWORK_REWARDS -> "TokenService.STAKING_NETWORK_REWARDS";
case STATE_ID_TOKEN_RELATIONS -> "TokenService.TOKEN_RELS";
case STATE_ID_TOKENS -> "TokenService.TOKENS";
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.hedera.hapi.node.state.blockstream.BlockStreamInfo;
import com.hedera.node.config.testfixtures.HederaTestConfigBuilder;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.config.api.Configuration;
import com.swirlds.state.spi.MigrationContext;
import com.swirlds.state.spi.StateDefinition;
Expand Down Expand Up @@ -87,6 +86,6 @@ void testMigration() {
verify(mockBlockStreamInfo).put(captor.capture());

BlockStreamInfo blockInfoCapture = captor.getValue();
assertEquals(new BlockStreamInfo(0, null, Bytes.EMPTY, Bytes.EMPTY), blockInfoCapture);
assertEquals(BlockStreamInfo.DEFAULT, blockInfoCapture);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.mockito.Mockito.verify;

import com.hedera.node.app.roster.schemas.V0540RosterSchema;
import com.swirlds.common.RosterStateId;
import com.swirlds.state.spi.Schema;
import com.swirlds.state.spi.SchemaRegistry;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -66,6 +65,6 @@ void registerSchemasRegistersTokenSchema() {

@Test
void testServiceNameReturnsCorrectName() {
assertThat(rosterService.getServiceName()).isEqualTo(RosterStateId.NAME);
assertThat(rosterService.getServiceName()).isEqualTo(RosterServiceImpl.NAME);
}
}
Loading

0 comments on commit f9f93bd

Please sign in to comment.