Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support restarting from RECORDS -> BOTH #15904

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_BLOCK_STREAM_INFO;
import static com.hedera.node.app.blocks.impl.BlockImplUtils.combine;
import static com.hedera.node.app.blocks.impl.ConcurrentStreamingTreeHasher.rootHashFrom;
import static com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.info.UnavailableNetworkInfo.UNAVAILABLE_NETWORK_INFO;
import static com.hedera.node.app.records.impl.BlockRecordInfoUtils.blockHashByBlockNumber;
import static com.hedera.node.app.records.schemas.V0490BlockRecordSchema.BLOCK_INFO_STATE_KEY;
Expand Down Expand Up @@ -347,7 +347,7 @@ public Hedera(
new SignatureExpanderImpl(),
new SignatureVerifierImpl(CryptographyHolder.get())));
contractServiceImpl = new ContractServiceImpl(appContext);
blockStreamService = new BlockStreamService(bootstrapConfig);
blockStreamService = new BlockStreamService();
// Register all service schema RuntimeConstructable factories before platform init
Set.of(
new EntityIdService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

package com.hedera.node.app.blocks;

import static com.hedera.node.config.types.StreamMode.RECORDS;
import static java.util.Objects.requireNonNull;

import com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema;
import com.hedera.node.config.data.BlockStreamConfig;
import com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.config.api.Configuration;
import com.swirlds.state.spi.SchemaRegistry;
import com.swirlds.state.spi.Service;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -42,18 +39,9 @@ public class BlockStreamService implements Service {

public static final String NAME = "BlockStreamService";

private final boolean enabled;

@Nullable
private Bytes migratedLastBlockHash;

/**
* Service constructor.
*/
public BlockStreamService(final Configuration config) {
this.enabled = config.getConfigData(BlockStreamConfig.class).streamMode() != RECORDS;
}

@NonNull
@Override
public String getServiceName() {
Expand All @@ -63,9 +51,7 @@ public String getServiceName() {
@Override
public void registerSchemas(@NonNull final SchemaRegistry registry) {
requireNonNull(registry);
if (enabled) {
registry.register(new V0540BlockStreamSchema(this::setMigratedLastBlockHash));
}
registry.register(new V0560BlockStreamSchema(this::setMigratedLastBlockHash));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.hedera.hapi.util.HapiUtils.asInstant;
import static com.hedera.node.app.blocks.impl.BlockImplUtils.appendHash;
import static com.hedera.node.app.blocks.impl.BlockImplUtils.combine;
import static com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.hapi.utils.CommonUtils.noThrowSha384HashOf;
import static com.hedera.node.app.records.impl.BlockRecordInfoUtils.HASH_SIZE;
import static com.swirlds.platform.state.SwirldStateManagerUtils.isInFreezePeriod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* <li>The <b>trailing 256 block hashes</b>, used to implement the EVM {@code BLOCKHASH} opcode.</li>
* </ol>
*/
public class V0540BlockStreamSchema extends Schema {
public class V0560BlockStreamSchema extends Schema {
public static final String BLOCK_STREAM_INFO_KEY = "BLOCK_STREAM_INFO";
private static final String SHARED_BLOCK_RECORD_INFO = "SHARED_BLOCK_RECORD_INFO";
private static final String SHARED_RUNNING_HASHES = "SHARED_RUNNING_HASHES";
Expand All @@ -63,14 +63,14 @@ public class V0540BlockStreamSchema extends Schema {
* The version of the schema.
*/
private static final SemanticVersion VERSION =
SemanticVersion.newBuilder().major(0).minor(54).patch(0).build();
SemanticVersion.newBuilder().major(0).minor(56).patch(0).build();

private final Consumer<Bytes> migratedBlockHashConsumer;

/**
* Schema constructor.
*/
public V0540BlockStreamSchema(@NonNull final Consumer<Bytes> migratedBlockHashConsumer) {
public V0560BlockStreamSchema(@NonNull final Consumer<Bytes> migratedBlockHashConsumer) {
super(VERSION);
this.migratedBlockHashConsumer = requireNonNull(migratedBlockHashConsumer);
}
Expand All @@ -81,13 +81,14 @@ public V0540BlockStreamSchema(@NonNull final Consumer<Bytes> migratedBlockHashCo
}

@Override
public void migrate(@NonNull final MigrationContext ctx) {
public void restart(@NonNull final MigrationContext ctx) {
requireNonNull(ctx);
final var state = ctx.newStates().getSingleton(BLOCK_STREAM_INFO_KEY);
if (ctx.previousVersion() == null) {
state.put(BlockStreamInfo.DEFAULT);
} else {
final var blockStreamInfo = state.get();
// This will be null if the previous version is before 0.54.0
// This will be null if the previous version is before 0.56.0
if (blockStreamInfo == null) {
final BlockInfo blockInfo =
(BlockInfo) requireNonNull(ctx.sharedValues().get(SHARED_BLOCK_RECORD_INFO));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.hedera.hapi.node.base.Timestamp;
import com.hedera.node.app.records.impl.BlockRecordManagerImpl;
import com.hedera.node.app.records.schemas.V0490BlockRecordSchema;
import com.hedera.node.app.records.schemas.V0540BlockRecordSchema;
import com.hedera.node.app.records.schemas.V0560BlockRecordSchema;
import com.swirlds.state.spi.SchemaRegistry;
import com.swirlds.state.spi.Service;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -48,6 +48,6 @@ public String getServiceName() {
@Override
public void registerSchemas(@NonNull final SchemaRegistry registry) {
registry.register(new V0490BlockRecordSchema());
registry.register(new V0540BlockRecordSchema());
registry.register(new V0560BlockRecordSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,25 @@
import com.swirlds.state.spi.MigrationContext;
import com.swirlds.state.spi.Schema;
import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class V0540BlockRecordSchema extends Schema {
private static final Logger logger = LogManager.getLogger(V0540BlockRecordSchema.class);
public class V0560BlockRecordSchema extends Schema {
/**
* The version of the schema.
*/
private static final SemanticVersion VERSION =
SemanticVersion.newBuilder().major(0).minor(54).patch(0).build();
SemanticVersion.newBuilder().major(0).minor(56).patch(0).build();

private static final String SHARED_BLOCK_RECORD_INFO = "SHARED_BLOCK_RECORD_INFO";
private static final String SHARED_RUNNING_HASHES = "SHARED_RUNNING_HASHES";

public V0540BlockRecordSchema() {
public V0560BlockRecordSchema() {
super(VERSION);
}

/**
* {@inheritDoc}
* */
@Override
public void migrate(@NonNull final MigrationContext ctx) {
final var isGenesis = ctx.previousVersion() == null;
if (!isGenesis) {
public void restart(@NonNull final MigrationContext ctx) {
if (ctx.previousVersion() != null) {
// Upcoming BlockStreamService schemas may need migration info
final var blocksState = ctx.newStates().getSingleton(BLOCK_INFO_STATE_KEY);
final var runningHashesState = ctx.newStates().getSingleton(RUNNING_HASHES_STATE_KEY);
ctx.sharedValues().put(SHARED_BLOCK_RECORD_INFO, blocksState.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@

package com.hedera.node.app.blocks;

import static com.hedera.node.app.fixtures.AppTestBase.DEFAULT_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

import com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema;
import com.hedera.node.config.testfixtures.HederaTestConfigBuilder;
import com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema;
import com.swirlds.state.spi.SchemaRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -35,43 +32,17 @@ final class BlockStreamServiceTest {
@Mock
private SchemaRegistry schemaRegistry;

private BlockStreamService subject;
private final BlockStreamService subject = new BlockStreamService();

@Test
void serviceNameAsExpected() {
givenDisabledSubject();

assertThat(subject.getServiceName()).isEqualTo("BlockStreamService");
}

@Test
void enabledSubjectRegistersV0540Schema() {
givenEnabledSubject();

subject.registerSchemas(schemaRegistry);

verify(schemaRegistry).register(argThat(s -> s instanceof V0540BlockStreamSchema));
}

@Test
void disabledSubjectDoesNotRegisterSchema() {
givenDisabledSubject();

subject.registerSchemas(schemaRegistry);

verifyNoInteractions(schemaRegistry);

assertThat(subject.migratedLastBlockHash()).isEmpty();
}

private void givenEnabledSubject() {
final var testConfig = HederaTestConfigBuilder.create()
.withValue("blockStream.streamMode", "BOTH")
.getOrCreateConfig();
subject = new BlockStreamService(testConfig);
}

private void givenDisabledSubject() {
subject = new BlockStreamService(DEFAULT_CONFIG);
verify(schemaRegistry).register(argThat(s -> s instanceof V0560BlockStreamSchema));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static com.hedera.node.app.blocks.BlockStreamService.FAKE_RESTART_BLOCK_HASH;
import static com.hedera.node.app.blocks.impl.BlockImplUtils.appendHash;
import static com.hedera.node.app.blocks.impl.BlockImplUtils.combine;
import static com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.fixtures.AppTestBase.DEFAULT_CONFIG;
import static com.hedera.node.app.hapi.utils.CommonUtils.noThrowSha384HashOf;
import static com.swirlds.platform.state.service.schemas.V0540PlatformStateSchema.PLATFORM_STATE_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.hedera.hapi.node.state.primitives.ProtoBytes;
import com.hedera.hapi.node.state.primitives.ProtoString;
import com.hedera.node.app.blocks.BlockStreamService;
import com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema;
import com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -59,7 +59,7 @@ void targetTypesAreSingletonAndQueue() {
@Test
void understandsStateIds() {
final var service = BlockStreamService.NAME;
final var stateKey = V0540BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
final var stateKey = V0560BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
assertEquals(stateIdFor(service, stateKey), listener.stateIdFor(service, stateKey));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.hedera.node.app.blocks.schemas;

import static com.hedera.node.app.blocks.schemas.V0540BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema.BLOCK_STREAM_INFO_KEY;
import static com.hedera.node.app.fixtures.AppTestBase.DEFAULT_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -43,7 +43,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class V0540BlockStreamSchemaTest {
public class V0560BlockStreamSchemaTest {
@Mock
private MigrationContext migrationContext;

Expand All @@ -56,16 +56,16 @@ public class V0540BlockStreamSchemaTest {
@Mock
private WritableSingletonState<BlockStreamInfo> state;

private V0540BlockStreamSchema subject;
private V0560BlockStreamSchema subject;

@BeforeEach
void setUp() {
subject = new V0540BlockStreamSchema(migratedBlockHashConsumer);
subject = new V0560BlockStreamSchema(migratedBlockHashConsumer);
}

@Test
void versionIsV0540() {
assertEquals(new SemanticVersion(0, 54, 0, "", ""), subject.getVersion());
void versionIsV0560() {
assertEquals(new SemanticVersion(0, 56, 0, "", ""), subject.getVersion());
}

@Test
Expand All @@ -83,7 +83,7 @@ void createsDefaultInfoAtGenesis() {
given(writableStates.<BlockStreamInfo>getSingleton(BLOCK_STREAM_INFO_KEY))
.willReturn(state);

subject.migrate(migrationContext);
subject.restart(migrationContext);

verify(state).put(BlockStreamInfo.DEFAULT);
}
Expand Down Expand Up @@ -112,7 +112,7 @@ void assumesMigrationIfNotGenesisAndStateIsNull() {
.willReturn(state);
given(migrationContext.sharedValues()).willReturn(sharedValues);

subject.migrate(migrationContext);
subject.restart(migrationContext);

verify(migratedBlockHashConsumer).accept(Bytes.fromHex("abcd".repeat(24)));
final var expectedInfo = new BlockStreamInfo(
Expand All @@ -136,7 +136,7 @@ void migrationIsNoopIfNotGenesisAndInfoIsNonNull() {
.willReturn(state);
given(state.get()).willReturn(BlockStreamInfo.DEFAULT);

subject.migrate(migrationContext);
subject.restart(migrationContext);

verifyNoInteractions(migratedBlockHashConsumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void setUpEach() throws Exception {
.withConfigValue("hedera.recordStream.signatureFileVersion", 6)
.withConfigValue("hedera.recordStream.compressFilesOnCreation", true)
.withConfigValue("hedera.recordStream.sidecarMaxSizeMb", 256)
.withConfigValue("blockStream.streamMode", "BOTH")
.withService(new BlockRecordService())
.withService(PLATFORM_STATE_SERVICE)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.hedera.hapi.node.state.blockrecords.RunningHashes;
import com.hedera.node.app.records.BlockRecordService;
import com.hedera.node.app.records.schemas.V0490BlockRecordSchema;
import com.hedera.node.app.records.schemas.V0540BlockRecordSchema;
import com.hedera.node.app.records.schemas.V0560BlockRecordSchema;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.state.spi.MigrationContext;
import com.swirlds.state.spi.Schema;
Expand Down Expand Up @@ -89,7 +89,7 @@ void testRegisterSchemas() {
runningHashesCapture.getValue());
assertEquals(new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, false, EPOCH), blockInfoCapture.getValue());
} else {
assertThat(schema).isInstanceOf(V0540BlockRecordSchema.class);
assertThat(schema).isInstanceOf(V0560BlockRecordSchema.class);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ void setUp() {

appBuilder = appBuilder()
.withHapiVersion(VERSION)
.withSoftwareVersion(VERSION)
.withConfigValue("hedera.recordStream.enabled", true)
.withConfigValue("hedera.recordStream.logDir", tempDir.toString())
.withConfigValue("hedera.recordStream.sidecarDir", "sidecar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public void commit() {
}

public static final class TestAppBuilder {
private SemanticVersion softwareVersion = CURRENT_VERSION;
private SemanticVersion hapiVersion = CURRENT_VERSION;
private Set<Service> services = new LinkedHashSet<>();
private TestConfigBuilder configBuilder = HederaTestConfigBuilder.create();
Expand All @@ -259,11 +258,6 @@ public TestAppBuilder withHapiVersion(@NonNull final SemanticVersion version) {
return this;
}

public TestAppBuilder withSoftwareVersion(@NonNull final SemanticVersion version) {
this.softwareVersion = version;
return this;
}

public TestAppBuilder withConfigSource(@NonNull final ConfigSource source) {
configBuilder.withSource(source);
return this;
Expand Down
Loading
Loading