From ac58d0b7be4d645e7280f4da4f10c683d8349f49 Mon Sep 17 00:00:00 2001 From: Jasper Potts <1466205+jasperpotts@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:08:45 -0800 Subject: [PATCH] feat: Tool for converting Record Files to Block Stream (#389) Signed-off-by: jasperpotts Co-authored-by: jasperpotts --- .gitignore | 1 + .../com.hedera.block.jpms-modules.gradle.kts | 29 +- settings.gradle.kts | 2 +- stream/build.gradle.kts | 2 +- tool.sh | 16 + tools/README.md | 136 +++++++- tools/build.gradle.kts | 69 +++- tools/docs/quickstart.md | 7 + .../hedera/block/tools/BlockStreamTool.java | 15 +- .../record2blocks/Record2BlockCommand.java | 311 +++++++++++++++++ .../record2blocks/gcp/AddNewerBlockTimes.java | 148 ++++++++ .../record2blocks/gcp/MainNetBucket.java | 314 +++++++++++++++++ .../mirrornode/ExtractBlockTimes.java | 122 +++++++ .../mirrornode/FetchBlockQuery.java | 87 +++++ .../mirrornode/FetchMirrorNodeRecordsCsv.java | 162 +++++++++ .../mirrornode/ValidateBlockTimes.java | 93 +++++ .../record2blocks/model/BlockInfo.java | 141 ++++++++ .../record2blocks/model/BlockTimes.java | 71 ++++ .../record2blocks/model/ChainFile.java | 121 +++++++ .../model/ChainFileAndCount.java | 25 ++ .../model/NumberedSidecarFile.java | 78 +++++ .../model/ParsedSignatureFile.java | 330 ++++++++++++++++++ .../record2blocks/model/RecordFileInfo.java | 104 ++++++ .../record2blocks/util/BlockWriter.java | 128 +++++++ .../record2blocks/util/RecordFileDates.java | 125 +++++++ tools/src/main/java/module-info.java | 15 - 26 files changed, 2620 insertions(+), 32 deletions(-) create mode 100755 tool.sh create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/Record2BlockCommand.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/AddNewerBlockTimes.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/MainNetBucket.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ExtractBlockTimes.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchBlockQuery.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchMirrorNodeRecordsCsv.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ValidateBlockTimes.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockInfo.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockTimes.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFile.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFileAndCount.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/NumberedSidecarFile.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ParsedSignatureFile.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/RecordFileInfo.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/BlockWriter.java create mode 100644 tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/RecordFileDates.java delete mode 100644 tools/src/main/java/module-info.java diff --git a/.gitignore b/.gitignore index f78eebcac..ab413b712 100644 --- a/.gitignore +++ b/.gitignore @@ -55,6 +55,7 @@ gradle-app.setting .env server/data/ +/tools/data/ # manual test files server/src/test/resources/test_output/ diff --git a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts index 14ae2e6c5..fe391c123 100644 --- a/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts +++ b/buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts @@ -68,6 +68,15 @@ jvmDependencyConflicts.patch { module("com.google.protobuf:protobuf-java-util") { annotationLibraries.forEach { removeDependency(it) } } + module("com.google.cloud:google-cloud-storage") { + annotationLibraries.forEach { removeDependency(it) } + } + module("com.google.api.grpc:proto-google-cloud-monitoring-v3") { + annotationLibraries.forEach { removeDependency(it) } + } + module("com.google.cloud:google-cloud-monitoring") { + annotationLibraries.forEach { removeDependency(it) } + } module("io.prometheus:simpleclient") { removeDependency("io.prometheus:simpleclient_tracer_otel") removeDependency("io.prometheus:simpleclient_tracer_otel_agent") @@ -124,7 +133,10 @@ extraJavaModuleInfo { exportAllPackages() mergeJar("javax.annotation:javax.annotation-api") } - module("com.google.errorprone:error_prone_annotations", "com.google.errorprone.annotations") + module("com.google.errorprone:error_prone_annotations", "com.google.errorprone.annotations") { + exportAllPackages() + patchRealModule() + } module("com.google.j2objc:j2objc-annotations", "com.google.j2objc.annotations") module("com.google.protobuf:protobuf-java", "com.google.protobuf") { exportAllPackages() @@ -142,12 +154,18 @@ extraJavaModuleInfo { module("io.perfmark:perfmark-api", "io.perfmark") module("javax.inject:javax.inject", "javax.inject") - module("commons-codec:commons-codec", "org.apache.commons.codec") + module("commons-codec:commons-codec", "org.apache.commons.codec") { + exportAllPackages() + patchRealModule() + } module("org.apache.commons:commons-math3", "org.apache.commons.math3") module("org.apache.commons:commons-collections4", "org.apache.commons.collections4") module("com.esaulpaugh:headlong", "headlong") - module("org.checkerframework:checker-qual", "org.checkerframework.checker.qual") + module("org.checkerframework:checker-qual", "org.checkerframework.checker.qual") { + exportAllPackages() + patchRealModule() + } module("net.i2p.crypto:eddsa", "net.i2p.crypto.eddsa") module("org.jetbrains:annotations", "org.jetbrains.annotations") module("org.antlr:antlr4-runtime", "org.antlr.antlr4.runtime") @@ -167,7 +185,10 @@ extraJavaModuleInfo { requireAllDefinedDependencies() requires("jdk.httpserver") } - + module("com.google.j2objc:j2objc-annotations", "com.google.j2objc.annotations") { + exportAllPackages() + patchRealModule() + } // Annotation processing only module("com.google.auto.service:auto-service-annotations", "com.google.auto.service") module("com.google.auto.service:auto-service", "com.google.auto.service.processor") diff --git a/settings.gradle.kts b/settings.gradle.kts index 37e1a23fb..b83e4f619 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -49,7 +49,7 @@ dependencyResolutionManagement { val protobufVersion = "4.28.2" val helidonVersion = "4.1.1" val grpcIoVersion = "1.65.1" - var pbjVersion = "0.9.11" + val pbjVersion = "0.9.11" // Compile time dependencies version("io.helidon.webserver.http2", helidonVersion) diff --git a/stream/build.gradle.kts b/stream/build.gradle.kts index 64f5f971c..20a1bdfab 100644 --- a/stream/build.gradle.kts +++ b/stream/build.gradle.kts @@ -33,7 +33,7 @@ tasks.withType().configureEach { tasks.cloneHederaProtobufs { // uncomment below to use a specific tag // tag = "v0.53.0" or a specific commit like "0047255" - tag = "1033f10" + tag = "eab8b58e30336512bcf387c803e6fc86b6ebe010" // uncomment below to use a specific branch // branch = "main" diff --git a/tool.sh b/tool.sh new file mode 100755 index 000000000..251ec9dc7 --- /dev/null +++ b/tool.sh @@ -0,0 +1,16 @@ +#!/bin/bash +# run gradle jar build and send output to /dev/null +./gradlew -q tool:shadowJar > /dev/null +# check if last command failed and exit if so +if [ $? -ne 0 ]; then + echo "Build failed" + exit 1 +fi +# change to the tools directory +pushd tools > /dev/null +# find the jar name in the build/libs directory +JAR=$(find build/libs -name 'tools-*-all.jar') +# run the command line tool built jar file forwarding all arguments +java -jar $JAR "$@" +# change back to the original directory +popd > /dev/null diff --git a/tools/README.md b/tools/README.md index d479002fc..a62153f85 100644 --- a/tools/README.md +++ b/tools/README.md @@ -3,10 +3,10 @@ ## Table of Contents 1. [Overview](#overview) -1. [Subcommands](#subcommands) +2. [Running from command line](#running-from-command-line) +3. [Subcommands](#subcommands) 1. [The `json` Subcommand](#the-json-subcommand) - 1. [The `info` Subcommand](#the-info-subcommand) -1. [Running from command line](#running-from-command-line) + 2. [The `info` Subcommand](#the-info-subcommand) ## Overview @@ -14,17 +14,26 @@ This subproject provides command line tools for working with block stream files uses [picocli](https://picocli.info) to provide a command line interface which makes it easy to extend and add new subcommands or options. +## Running from command line + +Refer to the [Quickstart](docs/quickstart.md) for a quick guide on how to run the tools CLI. + ## Subcommands The following subcommands are available: - `json` - Converts a binary block stream to JSON - `info` - Prints info for block files +- `record2block` - Converts a historical record stream files into blocks +- `fetchRecordsCsv` - Download mirror node record table CSV dump from GCP bucket +- `extractBlockTimes` - Extract block times from mirror node records csv file +- `validateBlockTimes` - Validates a block times file as produced by `extractBlockTimes` +- `addNewerBlockTimes` - Extends the block times file with newer block times ### The `json` Subcommand Converts a binary block stream to JSON -`Usage: subcommands json [-t] [-ms=] [...]` +`Usage: json [-t] [-ms=] [...]` **Options:** @@ -42,7 +51,7 @@ transactions human-readable. Prints info for block files -`Usage: subcommands info [-c] [-ms=] [-o=] [...]` +`Usage: info [-c] [-ms=] [-o=] [...]` **Options:** @@ -58,6 +67,119 @@ Prints info for block files - `...` - The block files or directories of block files to print info for -## Running from command line +### The `record2block` Subcommand -Refer to the [Quickstart](docs/quickstart.md) for a quick guide on how to run the tools CLI. +Converts a historical record stream files into blocks. This depends on the `block_times.bin` file being present. It can +be created by running the other commands `fetchRecordsCsv`, `extractBlockTimes` and `addNewerBlockTimes` in that order. +It can also be validated by running the `validateBlockTimes` command. + +This command depends on reading data from public requester pays Google Cloud buckets. To do that it needs you to be +authenticated with the Google Cloud SDK. You can authenticate with `gcloud auth application-default login` or +`gcloud auth login` see [Google Documentation](https://cloud.google.com/storage/docs/reference/libraries#authentication) +for more info. + +`Usage: record2block [-s 0] [-e 100] [-j] [-c] [--min-node-account-id=3] [--max-node-account-id=34] [-d ] [--block-times=] ` + +**Options:** + +- `-s ` or `--start-block=` + - The first block number to process + - Default: 0 +- `-e ` or `--end-block=` + - The last block number to process + - Default: 3001 +- `-j` or `--json` + - also output blocks as json, useful for debugging and testing + - Default: false +- `-c` or `--cache-enabled` + - Use local cache for downloaded content, saves cloud costs and bandwidth when testing + - Default: false +- `--min-node-account-id=` + - the account id of the first node in the network + - Default: 3 +- `--max-node-account-id=` + - the account id of the last node in the network + - Default: 34 +- `--data-dir=` + - the data directory for output and temporary files + - Default: "data" +- `--block-times=` + - Path to the block times ".bin" file. + - Default: "data/block_times.bin" + +### The `fetchRecordsCsv` Subcommand + +Download mirror node record table CSV dump from GCP bucket. The records table on mirror node has a row for every block +mirror node knows about. The CSV file is huge 11GB+ in November 2024. This data is important for records to blocks +conversion as we have to make sure the block number assigned for a record file matches what mirror node says as the +source of truth. + +This command depends on reading data from public requester pays Google Cloud buckets. To do that it needs you to be +authenticated with the Google Cloud SDK. You can authenticate with `gcloud auth application-default login` or +`gcloud auth login` see [Google Documentation](https://cloud.google.com/storage/docs/reference/libraries#authentication) +for more info. + +`Usage: fetchRecordsCsv [--record-csv=]` + +**Options:** + +- `--record-csv=` + - Path to the record CSV file. + - Default: "data/record.csv" + +### The `extractBlockTimes` Subcommand + +Extract block times from mirror node records csv file. Reads and produces . We need to +convert the mirror node records CSV because it is huge 11GB+ compressed and too large to fit into RAM, and we can not +random access easily. The only part of the data needed for the records to blocks conversion is the block times. The +block time being the record file time for a given block. The record file consensus time is used as the file name of the +record file in the bucket. + +The block times file is a binary file of longs, each long is the number of nanoseconds for that block after first block +time. So first block = 0, second about 5 seconds later etc. The index is the block number, so block 0 is first long, +block 1 is second block and so on. This file can then be memory mapped and used as fast lookup for block +number(array offset) into block time, i.e. record file name. + +`Usage: extractBlockTimes [--record-csv=] [--block-times=]` + +**Options:** + +- `--record-csv=` + - Path to the record CSV file. + - Default: "data/record.csv" +- `--block-times=` + - Path to the block times ".bin" file. + - Default: "data/block_times.bin" + + +### The `addNewerBlockTimes` Subcommand + +Extends the block times file with newer block times. This is done by listing the record files in the bucket and +counting them for block numbers. It processes day by day, listing one day then appending block times to the block times +file. Then at the end of each day it checks the block number it has computed still matches mirror node by using the +mirror node REST API. This whole process can take a long time if the mirror node CSV dump is old. + +This command depends on reading data from public requester pays Google Cloud buckets. To do that it needs you to be +authenticated with the Google Cloud SDK. You can authenticate with `gcloud auth application-default login` or +`gcloud auth login` see [Google Documentation](https://cloud.google.com/storage/docs/reference/libraries#authentication) +for more info. + +`Usage: addNewerBlockTimes [-c] [--min-node-account-id=3] [--max-node-account-id=34] [-d ] [--block-times=]` + +**Options:** + +- `-c` or `--cache-enabled` + - Use local cache for downloaded content, saves cloud costs and bandwidth when testing + - Default: true +- `--min-node-account-id=` + - the account id of the first node in the network + - Default: 3 +- `--max-node-account-id=` + - the account id of the last node in the network + - Default: 34 +- `--data-dir=` + - the data directory for output and temporary files + - Default: "data" +- `--block-times=` + - Path to the block times ".bin" file. + - Default: "data/block_times.bin" diff --git a/tools/build.gradle.kts b/tools/build.gradle.kts index 26a09c31b..946169635 100644 --- a/tools/build.gradle.kts +++ b/tools/build.gradle.kts @@ -1,3 +1,23 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.github.jengelman.gradle.plugins.shadow.internal.DefaultDependencyFilter +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import org.gradlex.javamodule.dependencies.tasks.ModuleDirectivesScopeCheck + /* * Copyright (C) 2022-2024 Hedera Hashgraph, LLC * @@ -17,15 +37,33 @@ plugins { id("application") id("com.hedera.block.tools") + id("com.gradleup.shadow") version "8.3.5" } description = "Hedera Block Stream Tools" -application { - mainModule = "com.hedera.block.tools" - mainClass = "com.hedera.block.tools.BlockStreamTool" +application { mainClass = "com.hedera.block.tools.BlockStreamTool" } + +// Generate Manifest with Main-Class and Implementation-Title +tasks.withType().configureEach { + manifest { + attributes( + "Main-Class" to application.mainClass.get(), + "Implementation-Title" to project.name, + "Implementation-Version" to project.version + ) + } } +// Allow non-module Jar +extraJavaModuleInfo { + failOnMissingModuleInfo = false + failOnAutomaticModules = false +} + +// Disable module directives scope check as we are not using modules +tasks.withType().configureEach { enabled = false } + mainModuleInfo { runtimeOnly("com.swirlds.config.impl") runtimeOnly("org.apache.logging.log4j.slf4j2.impl") @@ -33,3 +71,28 @@ mainModuleInfo { } testModuleInfo { requiresStatic("com.github.spotbugs.annotations") } + +dependencies { + implementation(platform("com.google.cloud:libraries-bom:26.49.0")) + implementation("com.google.cloud:google-cloud-storage") + implementation("com.github.luben:zstd-jni:1.5.6-6") + implementation("info.picocli:picocli:4.7.6") + // depend on peer streams gradle module to get access to protobuf generated classes + implementation(project(":stream")) +} + +tasks.withType().configureEach { + group = "shadow" + + // There is an issue in the shadow plugin that it automatically accesses the + // files in 'runtimeClasspath' while Gradle is building the task graph. + // See: https://github.com/GradleUp/shadow/issues/882 + dependencyFilter = NoResolveDependencyFilter() +} + +// Disable dependency resolution as it conflicts with shadow plugin +class NoResolveDependencyFilter : DefaultDependencyFilter(project) { + override fun resolve(configuration: FileCollection): FileCollection { + return configuration + } +} diff --git a/tools/docs/quickstart.md b/tools/docs/quickstart.md index b327ee67e..b4128387e 100644 --- a/tools/docs/quickstart.md +++ b/tools/docs/quickstart.md @@ -17,6 +17,13 @@ > recommended to use the project qualifier (i.e. `:tools:`) for > both simplicity and clarity. +### Easy way for Unix based OSs +There is a command line script for building and running tool, which is located in the root of the repository. It has the +nice extra feature of giving you colored console output. +``` +./tool.sh info --help +``` + ### Build the Tools > **NOTE:** if you have not done so already, it is diff --git a/tools/src/main/java/com/hedera/block/tools/BlockStreamTool.java b/tools/src/main/java/com/hedera/block/tools/BlockStreamTool.java index 01d275ad6..347de415a 100644 --- a/tools/src/main/java/com/hedera/block/tools/BlockStreamTool.java +++ b/tools/src/main/java/com/hedera/block/tools/BlockStreamTool.java @@ -18,6 +18,11 @@ import com.hedera.block.tools.commands.BlockInfo; import com.hedera.block.tools.commands.ConvertToJson; +import com.hedera.block.tools.commands.record2blocks.Record2BlockCommand; +import com.hedera.block.tools.commands.record2blocks.gcp.AddNewerBlockTimes; +import com.hedera.block.tools.commands.record2blocks.mirrornode.ExtractBlockTimes; +import com.hedera.block.tools.commands.record2blocks.mirrornode.FetchMirrorNodeRecordsCsv; +import com.hedera.block.tools.commands.record2blocks.mirrornode.ValidateBlockTimes; import picocli.CommandLine; import picocli.CommandLine.Command; @@ -29,7 +34,15 @@ name = "subcommands", mixinStandardHelpOptions = true, version = "BlockStreamTool 0.1", - subcommands = {ConvertToJson.class, BlockInfo.class}) + subcommands = { + ConvertToJson.class, + BlockInfo.class, + Record2BlockCommand.class, + FetchMirrorNodeRecordsCsv.class, + ExtractBlockTimes.class, + ValidateBlockTimes.class, + AddNewerBlockTimes.class + }) public final class BlockStreamTool { /** diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/Record2BlockCommand.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/Record2BlockCommand.java new file mode 100644 index 000000000..58767a836 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/Record2BlockCommand.java @@ -0,0 +1,311 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks; + +import static com.hedera.block.tools.commands.record2blocks.mirrornode.FetchBlockQuery.getPreviousHashForBlock; +import static com.hedera.block.tools.commands.record2blocks.util.BlockWriter.writeBlock; +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.blockTimeLongToInstant; + +import com.hedera.block.tools.commands.record2blocks.gcp.MainNetBucket; +import com.hedera.block.tools.commands.record2blocks.model.BlockInfo; +import com.hedera.block.tools.commands.record2blocks.model.BlockTimes; +import com.hedera.block.tools.commands.record2blocks.model.ChainFile; +import com.hedera.block.tools.commands.record2blocks.model.ParsedSignatureFile; +import com.hedera.block.tools.commands.record2blocks.model.RecordFileInfo; +import com.hedera.block.tools.commands.record2blocks.util.BlockWriter.BlockPath; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.BlockItem.ItemOneOfType; +import com.hedera.hapi.block.stream.RecordFileItem; +import com.hedera.hapi.block.stream.RecordFileSignature; +import com.hedera.hapi.block.stream.output.BlockHeader; +import com.hedera.hapi.node.base.BlockHashAlgorithm; +import com.hedera.hapi.node.base.Timestamp; +import com.hedera.hapi.streams.SidecarFile; +import com.hedera.pbj.runtime.OneOf; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import com.hedera.pbj.runtime.io.stream.WritableStreamingData; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import picocli.CommandLine.Command; +import picocli.CommandLine.Help.Ansi; +import picocli.CommandLine.Option; + +/** + * Command line command that converts a record stream to blocks + *

+ * Example block ranges for testing: + *

    + *
  • -s 0 -e 10 - Record File v2
  • + *
  • -s 12877843 -e 12877853 - Record File v5
  • + *
  • -s 72756872 -e 72756882 - Record File v6 with sidecars
  • + *
+ * Record files start at V2 at block 0 then change to V5 at block 12370838 and V6 at block 38210031 + */ +@SuppressWarnings({"FieldCanBeLocal", "CallToPrintStackTrace"}) +@Command(name = "record2block", description = "Converts a record stream files into blocks") +public class Record2BlockCommand implements Runnable { + + @Option( + names = {"-s", "--start-block"}, + description = "The block to start converting from") + private int startBlock = 0; + + @Option( + names = {"-e", "--end-block"}, + description = "The block to end converting at") + private int endBlock = 3001; + + @Option( + names = {"-j", "--json"}, + description = "also output blocks as json") + private boolean jsonEnabled = false; + + @Option( + names = {"-c", "--cache-enabled"}, + description = "Use local cache for downloaded content") + private boolean cacheEnabled = false; + + @Option( + names = {"--min-node-account-id"}, + description = "the account id of the first node in the network") + private int minNodeAccountId = 3; + + @Option( + names = {"--max-node-account-id"}, + description = "the account id of the last node in the network") + private int maxNodeAccountId = 34; + + @Option( + names = {"-d", "--data-dir"}, + description = "the data directory for output and temporary files") + private Path dataDir = Path.of("data"); + + /** The path to the block times file. */ + @Option( + names = {"--block-times"}, + description = "Path to the block times \".bin\" file.") + private Path blockTimesFile = Path.of("data/block_times.bin"); + + /** + * Path to the output blocks directory + */ + private Path blocksDir; + + /** + * Path to the output json blocks directory + */ + private Path blocksJsonDir; + + /** + * Empty Default constructor to remove JavaDoc warning + */ + public Record2BlockCommand() {} + + /** + * Main method to run the command + */ + @Override + public void run() { + // create executor service + try (final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); + final ExecutorService singleThreadWritingExecutor = Executors.newSingleThreadExecutor()) { + blocksDir = dataDir.resolve("blocks"); + blocksJsonDir = dataDir.resolve("blocks-json"); + // enable cache, disable if doing large batches + final MainNetBucket mainNetBucket = + new MainNetBucket(cacheEnabled, dataDir.resolve("gcp-cache"), minNodeAccountId, maxNodeAccountId); + // create blocks dir + Files.createDirectories(blocksDir); + if (jsonEnabled) { + Files.createDirectories(blocksJsonDir); + } + // check start block is less than end block + if (startBlock > endBlock) { + throw new IllegalArgumentException("Start block must be less than end block"); + } + // check blockTimesFile exists + if (!Files.exists(blockTimesFile)) { + throw new IllegalArgumentException("Block times file does not exist: " + blockTimesFile); + } + // map the block_times.bin file + final BlockTimes blockTimes = new BlockTimes(blockTimesFile); + // get previous block hash + Bytes previousBlockHash; + if (startBlock == 0) { + previousBlockHash = Bytes.wrap(new byte[48]); // empty hash for first block + } else { + // get previous block hash from mirror node + previousBlockHash = getPreviousHashForBlock(startBlock); + } + // iterate over the blocks + Instant currentHour = null; + List currentHoursFiles = null; + for (int blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { + final int finalBlockNumber = blockNumber; + // get the time of the record file for this block, from converted mirror node data + final long blockTime = blockTimes.getBlockTime(blockNumber); + final Instant blockTimeInstant = blockTimeLongToInstant(blockTime); + System.out.printf( + Ansi.AUTO.string("@|bold,green,underline Processing block|@ %d" + + " @|green at blockTime|@ %s" + + " @|cyan Progress = block %d of %d" + " = %.2f%% |@\n"), + blockNumber, + blockTimeInstant, + blockNumber - startBlock + 1, + endBlock - startBlock + 1, + ((double) (blockNumber - startBlock) / (double) (endBlock - startBlock)) * 100d); + // round instant to nearest hour + Instant blockTimeHour = blockTimeInstant.truncatedTo(ChronoUnit.HOURS); + // check if we are the same hour as last block, if not load the new hour + if (currentHour == null || !currentHour.equals(blockTimeHour)) { + currentHour = blockTimeHour; + currentHoursFiles = mainNetBucket.listHour(blockTime); + System.out.println(Ansi.AUTO.string( + "\r@|bold,yellow Listed " + currentHoursFiles.size() + " files from GCP|@")); + } + // create block info + BlockInfo blockInfo = new BlockInfo( + blockNumber, + blockTime, + currentHoursFiles.stream() + .filter(cf -> cf.blockTime() == blockTime) + .toList()); + // print block info + System.out.println(" " + blockInfo); + + // The next 3 steps we do in background threads as they all download files from GCP which can be slow + + // now we need to download the most common record file & parse version information out of record file + final Future recordFileInfoFuture = executorService.submit(() -> RecordFileInfo.parse( + blockInfo.mostCommonRecordFile().chainFile().download(mainNetBucket))); + + // download and parse all signature files then convert signature files to list of RecordFileSignatures + final List> recordFileSignatureFutures = blockInfo.signatureFiles().stream() + .map(cf -> executorService.submit(() -> { + final ParsedSignatureFile sigFile = ParsedSignatureFile.downloadAndParse(cf, mainNetBucket); + return new RecordFileSignature(Bytes.wrap(sigFile.signature()), sigFile.nodeId()); + })) + .toList(); + + // download most common sidecar files, one for each numbered sidecar + final List> sideCarsFutures = blockInfo.sidecarFiles().values().stream() + .map(sidecarFile -> executorService.submit(() -> { + byte[] sidecarFileBytes = sidecarFile + .mostCommonSidecarFile() + .chainFile() + .download(mainNetBucket); + try { + return SidecarFile.PROTOBUF.parse(Bytes.wrap(sidecarFileBytes)); + } catch (ParseException e) { + throw new RuntimeException(e); + } + })) + .toList(); + + // collect all background computed data from futures + final RecordFileInfo recordFileVersionInfo = recordFileInfoFuture.get(); + final List recordFileSignatures = getResults(recordFileSignatureFutures); + final List sideCars = getResults(sideCarsFutures); + + // build new block + final BlockHeader blockHeader = new BlockHeader( + recordFileVersionInfo.hapiProtoVersion(), + recordFileVersionInfo.hapiProtoVersion(), + blockNumber, + previousBlockHash, + new Timestamp(blockTimeInstant.getEpochSecond(), blockTimeInstant.getNano()), + BlockHashAlgorithm.SHA2_384); + final RecordFileItem recordFileItem = new RecordFileItem( + new Timestamp(blockTimeInstant.getEpochSecond(), blockTimeInstant.getNano()), + Bytes.wrap(recordFileVersionInfo.recordFileContents()), + sideCars, + recordFileSignatures); + final Block block = new Block(List.of( + new BlockItem(new OneOf<>(ItemOneOfType.BLOCK_HEADER, blockHeader)), + new BlockItem(new OneOf<>(ItemOneOfType.RECORD_FILE, recordFileItem)))); + + // write block to disk on a single threaded executor. This allows the loop to carry on and start + // downloading files for the next block. We should be download bound so optimizing to keep the queue of + // downloads as busy as possible. + singleThreadWritingExecutor.submit(() -> { + try { + final BlockPath blockPath = writeBlock(blocksDir, block); + // write as json for now as well + if (jsonEnabled) { + final Path blockJsonPath = blocksJsonDir.resolve(blockPath.blockNumStr() + ".blk.json"); + Files.createDirectories(blockJsonPath.getParent()); + try (WritableStreamingData out = new WritableStreamingData(Files.newOutputStream( + blockJsonPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))) { + Block.JSON.write(block, out); + } + System.out.println(Ansi.AUTO.string( + "@|bold,yellow Wrote block [|@" + finalBlockNumber + "@|bold,yellow ]to|@ " + + blockPath.dirPath() + + "/" + blockPath.zipFileName() + "@|bold,cyan :|@" + + blockPath.blockFileName() + "@|bold,yellow ] and json to|@ " + + blockJsonPath)); + } else { + System.out.println(Ansi.AUTO.string( + "@|bold,yellow Wrote block [|@" + finalBlockNumber + "@|bold,yellow ]to|@ " + + blockPath.dirPath() + + "/" + blockPath.zipFileName() + "@|bold,cyan :|@" + + blockPath.blockFileName())); + } + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } + }); + // update previous block hash + previousBlockHash = recordFileVersionInfo.blockHash(); + } + } catch (IOException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + /** + * Get list results of a list of future + * + * @param futures list of futures to collect results from + * @return list of results + * @param the type of the future + */ + private static List getResults(List> futures) { + try { + List results = new ArrayList<>(futures.size()); + for (Future future : futures) { + results.add(future.get()); + } + return results; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/AddNewerBlockTimes.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/AddNewerBlockTimes.java new file mode 100644 index 000000000..a9523821e --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/AddNewerBlockTimes.java @@ -0,0 +1,148 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.gcp; + +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.blockTimeLongToInstant; +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.extractRecordFileTime; + +import com.hedera.block.tools.commands.record2blocks.mirrornode.FetchBlockQuery; +import com.hedera.block.tools.commands.record2blocks.util.RecordFileDates; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.List; +import picocli.CommandLine.Command; +import picocli.CommandLine.Help.Ansi; +import picocli.CommandLine.Option; + +/** + * Add block times for blocks newer than mirror node data from GCP + */ +@SuppressWarnings("FieldCanBeLocal") +@Command(name = "addNewerBlockTimes", description = "Add block times for blocks newer than mirror node data from GCP") +public class AddNewerBlockTimes implements Runnable { + @Option( + names = {"-c", "--cache-enabled"}, + description = "Use local cache for downloaded content") + private boolean cacheEnabled = true; + + @Option( + names = {"--min-node-account-id"}, + description = "the account id of the first node in the network") + private int minNodeAccountId = 3; + + @Option( + names = {"--max-node-account-id"}, + description = "the account id of the last node in the network") + private int maxNodeAccountId = 34; + + @Option( + names = {"-d", "--data-dir"}, + description = "the data directory for output and temporary files") + private Path dataDir = Path.of("data"); + + /** The path to the block times file. */ + @Option( + names = {"--block-times"}, + description = "Path to the block times \".bin\" file.") + private Path blockTimesFile = Path.of("data/block_times.bin"); + + /** + * Add block times for blocks newer than mirror node data from GCP. This is done by listing the record files and + * sorting and counting them. + */ + @Override + public void run() { + try { + System.out.println( + Ansi.AUTO.string("@|bold,green AddNewerBlockTimes - reading existing block times file data|@")); + System.out.println(Ansi.AUTO.string("@|yellow blockTimesFile =|@ " + blockTimesFile)); + final long binFileSize = Files.size(blockTimesFile); + // get last block number + final int lastBlockNumberInFile = (int) (binFileSize / Long.BYTES) - 1; + System.out.println(Ansi.AUTO.string("@|yellow lastBlockNumberInFile =|@ " + lastBlockNumberInFile)); + // read last long in block_times.bin + final long lastBlockTime; + try (RandomAccessFile raf = new RandomAccessFile(blockTimesFile.toFile(), "r")) { + raf.seek(binFileSize - Long.BYTES); + lastBlockTime = raf.readLong(); + } + System.out.println(Ansi.AUTO.string("@|yellow lastBlockTime = |@" + lastBlockTime + " nanos @|yellow =|@ " + + RecordFileDates.blockTimeLongToInstant(lastBlockTime))); + // Open connection to get data from mainnet GCP bucket + final MainNetBucket mainNetBucket = + new MainNetBucket(cacheEnabled, dataDir.resolve("gcp-cache"), minNodeAccountId, maxNodeAccountId); + + // find the day containing the last block time + final Instant lastBlockTimeInstant = blockTimeLongToInstant(lastBlockTime); + final Instant lastBlockDay = lastBlockTimeInstant.truncatedTo(ChronoUnit.DAYS); + System.out.println(Ansi.AUTO.string("@|yellow lastBlockDay = |@" + lastBlockDay)); + // create date formatter for output + DateTimeFormatter dateFormatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC")); + // loop over days from lastBlockDay to today + Instant day = lastBlockDay; + long blockNumber = lastBlockNumberInFile; + try (RandomAccessFile raf = new RandomAccessFile(blockTimesFile.toFile(), "rw")) { + raf.seek(raf.length()); + while (day.isBefore(Instant.now().truncatedTo(ChronoUnit.DAYS))) { + System.out.println( + Ansi.AUTO.string("@|bold,green,underline Processing day |@" + dateFormatter.format(day))); + // get listing of all files in bucket for current day + final List allFilesInDay = + mainNetBucket.listDayFileNames(RecordFileDates.blockTimeInstantToLong(day)); + for (String recordFileName : allFilesInDay) { + final Instant fileTime = extractRecordFileTime(recordFileName); + if (fileTime.isAfter(lastBlockTimeInstant)) { + blockNumber++; + if (blockNumber < (lastBlockNumberInFile + 5) || blockNumber % 1_000 == 0) { + System.out.println(Ansi.AUTO.string( + "@|yellow blockNumber = |@" + blockNumber + " @|yellow fileTime = |@" + + fileTime + " @|yellow recordFileName = |@" + + recordFileName)); + } + final long currentBlockTime = RecordFileDates.instantToBlockTimeLong(fileTime); + // append block time to block_times.bin + raf.writeLong(currentBlockTime); + } + } + // flush the file + raf.getChannel().force(false); + // double check last block number once each day + String blockFileNameFromMirrorNode = FetchBlockQuery.getRecordFileNameForBlock(blockNumber); + String lastRecordFileName = allFilesInDay.getLast(); + System.out.println(Ansi.AUTO.string("@|cyan,bold Checking|@ @|yellow lastBlockNumberOfDay = |@" + + blockNumber + " @|yellow blockFileNameFromMirrorNode = |@" + + blockFileNameFromMirrorNode + " @|yellow lastRecordFileName = |@" + + lastRecordFileName)); + if (!blockFileNameFromMirrorNode.equals(lastRecordFileName)) { + throw new RuntimeException("Last block of day number mismatch"); + } + // next day + day = day.plus(1, ChronoUnit.DAYS); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/MainNetBucket.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/MainNetBucket.java new file mode 100644 index 000000000..d4731905d --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/gcp/MainNetBucket.java @@ -0,0 +1,314 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.gcp; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobField; +import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.StorageOptions; +import com.hedera.block.tools.commands.record2blocks.model.ChainFile; +import com.hedera.block.tools.commands.record2blocks.util.RecordFileDates; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * A class to list and download files from the mainnet bucket. This is designed to be thread safe. + *

+ * Example bucket paths + *

    + *
  • gs://hedera-mainnet-streams/recordstreams/record0.0.3/2019-09-13T21_53_51.396440Z.rcd
  • + *
  • gs://hedera-mainnet-streams/recordstreams/record0.0.3/sidecar/2023-04-25T17_42_16.032498578Z_01.rcd.gz
  • + *
+ */ +public class MainNetBucket { + /** The required fields we need from blobs */ + private static final Storage.BlobListOption REQUIRED_FIELDS = + BlobListOption.fields(BlobField.NAME, BlobField.SIZE, BlobField.MD5HASH); + /** Blob name field only */ + private static final Storage.BlobListOption NAME_FIELD_ONLY = BlobListOption.fields(BlobField.NAME); + /** The mainnet bucket name*/ + private static final String HEDERA_MAINNET_STREAMS_BUCKET = "hedera-mainnet-streams"; + /** The mainnet bucket GCP API instance */ + private static final Bucket STREAMS_BUCKET = + StorageOptions.getDefaultInstance().getService().get(HEDERA_MAINNET_STREAMS_BUCKET); + + /** + * The cache enabled switch. When caching is enabled all fetched data is saved on disk and reused between runs. This + * is useful for debugging and development. But when we get to doing long runs with many TB of data this is not + * practical. + */ + private final boolean cacheEnabled; + + /** The cache directory, where we store all downloaded content for reuse if CACHE_ENABLED is true. */ + private final Path cacheDir; // = DATA_DIR.resolve("gcp-cache"); + + /** The minimum node account id in the network. */ + private final int minNodeAccountId; + + /** The maximum node account id in the network. */ + private final int maxNodeAccountId; + /** + * Create a new MainNetBucket instance with the given cache enabled switch and cache directory. + * + * @param cacheEnabled the cache enabled switch + * @param cacheDir the cache directory + * @param minNodeAccountId the minimum node account id in the network + * @param maxNodeAccountId the maximum node account id in the network + */ + public MainNetBucket(boolean cacheEnabled, Path cacheDir, int minNodeAccountId, int maxNodeAccountId) { + this.cacheEnabled = cacheEnabled; + this.cacheDir = cacheDir; + this.minNodeAccountId = minNodeAccountId; + this.maxNodeAccountId = maxNodeAccountId; + } + + /** + * Download a file from GCP, caching if CACHE_ENABLED is true. This is designed to be thread safe. + * + * @param path the path to the file in the bucket + * @return the bytes of the file + */ + public byte[] download(String path) { + try { + final Path cachedFilePath = cacheDir.resolve(path); + byte[] rawBytes; + if (cacheEnabled && Files.exists(cachedFilePath)) { + rawBytes = Files.readAllBytes(cachedFilePath); + } else { + rawBytes = STREAMS_BUCKET.get(path).getContent(); + if (cacheEnabled) { + Files.createDirectories(cachedFilePath.getParent()); + Path tempCachedFilePath = Files.createTempFile(cacheDir, null, ".tmp"); + Files.write(tempCachedFilePath, rawBytes); + Files.move(tempCachedFilePath, cachedFilePath); + } + } + // if file is gzipped, unzip it + if (path.endsWith(".gz")) { + try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(rawBytes))) { + return gzipInputStream.readAllBytes(); + } + } else { + return rawBytes; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Download a file from GCP as a stream, caching if CACHE_ENABLED is true. This is designed to be thread safe. + * + * @param path the path to the file in the bucket + * @return the stream of the file + */ + public java.io.InputStream downloadStreaming(String path) { + try { + Path cachedFilePath = cacheDir.resolve(path); + if (cacheEnabled && Files.exists(cachedFilePath)) { + return Files.newInputStream(cachedFilePath, StandardOpenOption.READ); + } else { + final byte[] bytes = STREAMS_BUCKET.get(path).getContent(); + if (cacheEnabled) { + Files.createDirectories(cachedFilePath.getParent()); + Path tempCachedFilePath = Files.createTempFile(cacheDir, null, ".tmp"); + Files.write(tempCachedFilePath, bytes); + Files.move(tempCachedFilePath, cachedFilePath); + } + return new ByteArrayInputStream(bytes); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * List all the ChainFiles in the bucket that have a start time within the given day that contains the given + * blockStartTime. This fetches blobs for all record files, signature files and sidecar files. For all nodes. + * + * @param blockStartTime the start time of a block, in nanoseconds since OA + * @return a stream of ChainFiles that contain the records for the given day. + */ + @SuppressWarnings("unused") + public List listDay(long blockStartTime) { + final String datePrefix = RecordFileDates.blockTimeLongToRecordFilePrefix(blockStartTime); + // crop to the hour + final String dayPrefix = datePrefix.substring(0, datePrefix.indexOf('T')); + return listWithFilePrefix(dayPrefix); + } + + /** + * List all the record file names in the bucket that have a start time within the given day that contains the given + * blockStartTime. + * + * @param blockStartTime the start time of a block, in nanoseconds since OA + * @return a list of unique names of all record files starting with the given file name prefix. + */ + public List listDayFileNames(long blockStartTime) { + final String datePrefix = RecordFileDates.blockTimeLongToRecordFilePrefix(blockStartTime); + // crop to the hour + final String dayPrefix = datePrefix.substring(0, datePrefix.indexOf('T')); + return listNamesWithFilePrefix(dayPrefix); + } + + /** + * List all the ChainFiles in the bucket that have a start time within the given hour that contains the given + * blockStartTime. This fetches blobs for all record files, signature files and sidecar files. For all nodes. + * + * @param blockStartTime the start time of a block, in nanoseconds since OA + * @return a stream of ChainFiles that contain the records for the given hour. + */ + public List listHour(long blockStartTime) { + final String datePrefix = RecordFileDates.blockTimeLongToRecordFilePrefix(blockStartTime); + // crop to the hour + final String hourPrefix = datePrefix.substring(0, datePrefix.indexOf('_')); + return listWithFilePrefix(hourPrefix); + } + + /** + * List all the ChainFiles in the bucket that have this file name prefix. This fetches blobs for all record files, + * signature files and sidecar files. For all nodes. + * + * @param filePrefix the prefix of the file name to search for + * @return a stream of ChainFiles that have a filename that starts with the given prefix. + */ + private List listWithFilePrefix(String filePrefix) { + try { + // read from cache if it already exists in cache + final Path listCacheFilePath = cacheDir.resolve("list-" + filePrefix + ".bin.gz"); + if (cacheEnabled && Files.exists(listCacheFilePath)) { + try (ObjectInputStream ois = + new ObjectInputStream(new GZIPInputStream(Files.newInputStream(listCacheFilePath)))) { + final int fileCount = ois.readInt(); + final List chainFiles = new ArrayList<>(fileCount); + for (int i = 0; i < fileCount; i++) { + chainFiles.add((ChainFile) ois.readObject()); + } + return chainFiles; + } + } + // create a list of ChainFiles + List chainFiles = IntStream.range(minNodeAccountId, maxNodeAccountId + 1) + .parallel() + .mapToObj(nodeAccountId -> Stream.concat( + STREAMS_BUCKET + .list( + BlobListOption.prefix("recordstreams/record0.0." + nodeAccountId + + "/" + filePrefix), + REQUIRED_FIELDS) + .streamAll(), + STREAMS_BUCKET + .list( + BlobListOption.prefix("recordstreams/record0.0." + nodeAccountId + + "/sidecar/" + filePrefix), + REQUIRED_FIELDS) + .streamAll()) + .map(blob -> new ChainFile( + nodeAccountId, + blob.getName(), + blob.getSize().intValue(), + blob.getMd5()))) + .flatMap(Function.identity()) + .toList(); + // save the list to cache + if (cacheEnabled) { + Files.createDirectories(listCacheFilePath.getParent()); + try (ObjectOutputStream oos = + new ObjectOutputStream(new GZIPOutputStream(Files.newOutputStream(listCacheFilePath)))) { + oos.writeInt(chainFiles.size()); + for (ChainFile chainFile : chainFiles) { + oos.writeObject(chainFile); + } + } + } + // return all the streams combined + return chainFiles; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * List all the record file names in the bucket that have this file name prefix. This fetches blobs for all record files, + * signature files and sidecar files. For all nodes. + * + * @param filePrefix the prefix of the file name to search for + * @return a list of unique names of all record files starting with the given file name prefix. + */ + private List listNamesWithFilePrefix(String filePrefix) { + try { + // read from cache if it already exists in cache + final Path listCacheFilePath = cacheDir.resolve("list-names-" + filePrefix + ".txt.gz"); + if (cacheEnabled && Files.exists(listCacheFilePath)) { + try (var lineStream = Files.lines(listCacheFilePath)) { + return lineStream.toList(); + } + } + // create a list of ChainFiles + List fileNames = IntStream.range(minNodeAccountId, maxNodeAccountId + 1) + .parallel() + .mapToObj(nodeAccountId -> STREAMS_BUCKET + .list( + BlobListOption.prefix( + "recordstreams/record0.0." + nodeAccountId + "/" + filePrefix), + NAME_FIELD_ONLY) + .streamAll() + .map(BlobInfo::getName) + .map(name -> name.substring(name.lastIndexOf('/') + 1)) + .filter(name -> name.endsWith(".rcd") || name.endsWith(".rcd.gz"))) + .flatMap(Function.identity()) + .sorted() + .distinct() + .toList(); + // save the list to cache + if (cacheEnabled) { + Files.createDirectories(listCacheFilePath.getParent()); + Files.write(listCacheFilePath, fileNames); + } + // return all the file names + return fileNames; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * The main method test listing all the files in the bucket for the given day and hour. + */ + public static void main(String[] args) { + MainNetBucket mainNetBucket = new MainNetBucket(true, Path.of("data/gcp-cache"), 0, 34); + mainNetBucket.listHour(0).forEach(System.out::println); + System.out.println("=========================================================================="); + final Instant dec1st2024 = Instant.parse("2024-12-01T00:00:00Z"); + final long dec1st2024BlockTime = RecordFileDates.instantToBlockTimeLong(dec1st2024); + mainNetBucket.listHour(dec1st2024BlockTime).forEach(System.out::println); + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ExtractBlockTimes.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ExtractBlockTimes.java new file mode 100644 index 000000000..3d0316a8b --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ExtractBlockTimes.java @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.mirrornode; + +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.recordFileNameToBlockTimeLong; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPInputStream; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +/** + * Read the record_file.csv.gz file from mirror node and extract the block times into a file. + *

+ * The block times file is a binary file of longs, each long is the number of nanoseconds for that block after first + * block time. So first block = 0, second about 5 seconds later etc. The index is the block number, so block 0 is first + * long, block 1 is second block and so on. + *

+ */ +@SuppressWarnings({"DuplicatedCode", "CallToPrintStackTrace"}) +@Command(name = "extractBlockTimes", description = "Extract block times from mirror node records csv file") +public class ExtractBlockTimes implements Runnable { + /** the number of blocks in the record CSV file roughly, used for progress estimation */ + private static final int NUMBER_OF_BLOCKS_ROUNDED_UP = 70_000_000; + + /** The path to the record table CSV from mirror node, gzipped. */ + @Option( + names = {"--record-csv"}, + description = "Path to the record table CSV from mirror node, gzipped.") + private Path recordsCsvFile = Path.of("data/record_file.csv.gz"); + + /** The path to the block times file. */ + @Option( + names = {"--block-times"}, + description = "Path to the block times \".bin\" file.") + private Path blockTimesFile = Path.of("data/block_times.bin"); + + /** + * Read the record file table CSV file and extract the block times into a file. + */ + @Override + public void run() { + // get the start time of the first block + // create off heap array to store the block times + final ByteBuffer blockTimesBytes = ByteBuffer.allocateDirect(NUMBER_OF_BLOCKS_ROUNDED_UP * Long.BYTES); + final LongBuffer blockTimes = blockTimesBytes.asLongBuffer(); + // count the number of blocks to print progress + final AtomicInteger blockCount = new AtomicInteger(0); + // read the record file table CSV file + try (var reader = new BufferedReader( + new InputStreamReader(new GZIPInputStream(new FileInputStream(recordsCsvFile.toFile()))))) { + // skip header + reader.readLine(); + // read all lines + reader.lines().parallel().forEach(line -> { + final String[] parts = line.split(","); + final String recordStreamFileName = parts[0]; + final int blockNumber = Integer.parseInt(parts[15]); + // compute nanoseconds since the first block + final long nanoseconds = recordFileNameToBlockTimeLong(recordStreamFileName); + // write the block time to the off heap array + blockTimes.put(blockNumber, nanoseconds); + // print progress + int currentBlockCount = blockCount.incrementAndGet(); + if (currentBlockCount % 100_000 == 0) { + System.out.printf( + "\rblock %,10d - %2.1f%% complete", + currentBlockCount, (currentBlockCount / 70_000_000f) * 100); + } + }); + System.out.println("\nTotal blocks read = " + blockCount.get()); + // set limit to the number of blocks read + final int totalBlockTimesBytes = blockCount.get() * Long.BYTES; + blockTimesBytes.limit(totalBlockTimesBytes); + blockTimes.limit(blockCount.get()); + // scan the block times to find any blocks missing times + long totalBlocksWithoutTimes = 0; + blockTimes.position(0); + for (int i = 0; i < blockTimes.limit(); i++) { + if (blockTimes.get(i) == 0) { + totalBlocksWithoutTimes++; + System.out.println("block[" + i + "] is missing time - blockTimes[" + blockTimes.get(i) + "] = "); + } + } + System.out.println("\ntotalBlocksWithoutTimes = " + totalBlocksWithoutTimes); + // write the block times to a file + try (final var out = + Files.newByteChannel(blockTimesFile, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + blockTimesBytes.position(0); + long bytesWritten = out.write(blockTimesBytes); + System.out.println("bytesWritten = " + bytesWritten); + if (bytesWritten != totalBlockTimesBytes) { + System.out.println("ERROR: bytesWritten != totalBlockTimesBytes[" + totalBlockTimesBytes + "]"); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchBlockQuery.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchBlockQuery.java new file mode 100644 index 000000000..23ac92f9e --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchBlockQuery.java @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.mirrornode; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URI; +import java.net.URL; +import java.util.HexFormat; + +/** + * Query Mirror Node and fetch block information + */ +public class FetchBlockQuery { + + /** + * Get the record file name for a block number from the mirror node. + * + * @param blockNumber the block number + * @return the record file name + */ + public static String getRecordFileNameForBlock(long blockNumber) { + final String url = "https://mainnet-public.mirrornode.hedera.com/api/v1/blocks/" + blockNumber; + final JsonObject json = readUrl(url); + return json.get("name").getAsString(); + } + + /** + * Get the previous hash for a block number from the mirror node. + * + * @param blockNumber the block number + * @return the record file name + */ + public static Bytes getPreviousHashForBlock(long blockNumber) { + final String url = "https://mainnet-public.mirrornode.hedera.com/api/v1/blocks/" + blockNumber; + final JsonObject json = readUrl(url); + final String hashStr = json.get("previous_hash").getAsString(); + return Bytes.wrap(HexFormat.of().parseHex(hashStr.substring(2))); // remove 0x prefix and parse + } + + /** + * Read a URL and return the JSON object. + * + * @param url the URL to read + * @return the JSON object + */ + private static JsonObject readUrl(String url) { + try { + URL u = new URI(url).toURL(); + try (Reader reader = new InputStreamReader(u.openStream())) { + return new Gson().fromJson(reader, JsonObject.class); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Test main method + * + * @param args the command line arguments + */ + public static void main(String[] args) { + System.out.println("Fetching block query..."); + int blockNumber = 69333000; + System.out.println("blockNumber = " + blockNumber); + String recordFileName = getRecordFileNameForBlock(blockNumber); + System.out.println("recordFileName = " + recordFileName); + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchMirrorNodeRecordsCsv.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchMirrorNodeRecordsCsv.java new file mode 100644 index 000000000..9a64b556b --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/FetchMirrorNodeRecordsCsv.java @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.mirrornode; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.ServiceOptions; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobGetOption; +import com.google.cloud.storage.StorageOptions; +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +/** + * Download mirror node record table CSV dump from GCP bucket + */ +@SuppressWarnings({"CallToPrintStackTrace", "unused"}) +@Command(name = "fetchRecordsCsv", description = "Download mirror node record table CSV dump from GCP bucket") +public class FetchMirrorNodeRecordsCsv implements Runnable { + /** The GCP bucket name that contains CSV dumps of mirror node */ + private static final String bucketName = "mirrornode-db-export"; + + /** The path to the record table CSV in bucket */ + private static final String objectPath = "0.113.2/record_file.csv.gz"; + + /** The path to the record table CSV from mirror node, gzipped. */ + @Option( + names = {"--record-csv"}, + description = "Path to the record table CSV from mirror node, gzipped.") + private Path recordsCsvFile = Path.of("data/record_file.csv.gz"); + + /** + * Download the record table CSV from mirror node GCP bucket + */ + @Override + public void run() { + try { + // Load the current credentials + GoogleCredentials.getApplicationDefault(); + + // Get the project ID from the credentials + String projectId = ServiceOptions.getDefaultProjectId(); + + if (projectId != null) { + System.out.println("Project ID: " + projectId); + } else { + System.out.println("Project ID not found."); + System.exit(1); + } + + // Instantiates a GCP Storage client + final Storage storage = StorageOptions.getDefaultInstance().getService(); + // Read the object from the bucket with requester pays option + BlobId blobId = BlobId.of(bucketName, objectPath); + Blob blob = storage.get(blobId, BlobGetOption.userProject(projectId)); + // print error if file already exists + if (Files.exists(recordsCsvFile)) { + System.err.println("Output file already exists: " + recordsCsvFile); + System.exit(1); + } + // create parent directories + //noinspection ResultOfMethodCallIgnored + recordsCsvFile.toFile().getParentFile().mkdirs(); + // download file + try (ProgressOutputStream out = new ProgressOutputStream( + new BufferedOutputStream(new FileOutputStream(recordsCsvFile.toFile()), 1024 * 1024 * 32), + blob.getSize(), + recordsCsvFile.getFileName().toString())) { + blob.downloadTo(out); + } catch (IOException e) { + e.printStackTrace(); + } + } catch (Exception e) { + e.printStackTrace(); + System.exit(1); + } + } + + /** + * A simple output stream that prints progress to the console. + */ + public static class ProgressOutputStream extends FilterOutputStream { + private static final long MB = 1024 * 1024; + private final long size; + private final String name; + private long bytesWritten = 0; + + /** + * Create new progress output stream. + * + * @param out the output stream to wrap + * @param size the size of the output stream + * @param name the name of the output stream + */ + public ProgressOutputStream(OutputStream out, long size, String name) { + super(out); + this.size = size; + this.name = name; + } + + /** + * Write a byte to the output stream. + * + * @param b the byte to write + * @throws IOException if an error occurs writing the byte + */ + @Override + public void write(int b) throws IOException { + super.write(b); + bytesWritten++; + printProgress(); + } + + /** + * Write a byte array to the output stream. + * + * @param b the byte array to write + * @param off the offset in the byte array to start writing + * @param len the number of bytes to write + * @throws IOException if an error occurs writing the byte array + */ + @Override + public void write(byte[] b, int off, int len) throws IOException { + super.write(b, off, len); + bytesWritten += len; + printProgress(); + } + + /** + * Print the progress of the output stream to the console. + */ + private void printProgress() { + if (bytesWritten % MB == 0) { + System.out.printf( + "\rProgress: %.0f%% - %,d MB written of %s", + (bytesWritten / (double) size) * 100d, bytesWritten / MB, name); + } + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ValidateBlockTimes.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ValidateBlockTimes.java new file mode 100644 index 000000000..e10b28c6f --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/mirrornode/ValidateBlockTimes.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.mirrornode; + +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.blockTimeLongToRecordFilePrefix; + +import com.hedera.block.tools.commands.record2blocks.model.BlockTimes; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPInputStream; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +/** + * Validate the block times in the block_times.bin file by comparing them to the record file names in CSV. + */ +@Command(name = "validateBlockTimes", description = "Validates a block times file") +public class ValidateBlockTimes implements Runnable { + + /** The path to the record table CSV from mirror node, gzipped. */ + @Option( + names = {"--record-csv"}, + description = "Path to the record table CSV from mirror node, gzipped.") + private Path recordsCsvFile = Path.of("data/record_file.csv.gz"); + + /** The path to the block times file. */ + @Option( + names = {"--block-times"}, + description = "Path to the block times \".bin\" file.") + private Path blockTimesFile = Path.of("data/block_times.bin"); + + /** + * Read the record file table CSV file and validate the block times in the block times file. + */ + @Override + public void run() { + try { + final BlockTimes blockTimes = new BlockTimes(blockTimesFile); + // count the number of blocks to print progress + final AtomicInteger blockCount = new AtomicInteger(0); + // read the record file table CSV file + try (var reader = new BufferedReader( + new InputStreamReader(new GZIPInputStream(new FileInputStream(recordsCsvFile.toFile()))))) { + // skip header + reader.readLine(); + // read all lines + reader.lines().parallel().forEach(line -> { + final String[] parts = line.split(","); + final String recordStreamFileName = parts[0]; + final int blockNumber = Integer.parseInt(parts[15]); + // get the block offset from blockTimes + final long blockTime = blockTimes.getBlockTime(blockNumber); + // convert the block time to a string without 'Z' on the end + String blockTimeString = blockTimeLongToRecordFilePrefix(blockTime); + // check the file name starts with the block time + if (!recordStreamFileName.startsWith(blockTimeString)) { + System.err.printf( + "Block %d has incorrect time %s should be %s%n", + blockNumber, recordStreamFileName, blockTimeString); + } + // print progress + int currentBlockCount = blockCount.incrementAndGet(); + if (currentBlockCount % 100_000 == 0) { + System.out.printf( + "\rblock %,10d - %2.1f%% complete", + currentBlockCount, (currentBlockCount / 70_000_000f) * 100); + } + }); + } + } catch (Exception e) { + //noinspection CallToPrintStackTrace + e.printStackTrace(); + System.exit(1); + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockInfo.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockInfo.java new file mode 100644 index 000000000..b0a28a653 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockInfo.java @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +import com.hedera.block.tools.commands.record2blocks.model.ChainFile.Kind; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import picocli.CommandLine.Help.Ansi; + +/** + * BlockInfo represents a Hedera block with its associated record files, sidecar files and signature files. + * + * @param blockNum the block number + * @param blockTime the block time + * @param recordFiles the record files associated with the block + * @param mostCommonRecordFile the record file with the most occurrences + * @param sidecarFiles the sidecar files associated with the block + * @param signatureFiles the signature files associated with the block + */ +@SuppressWarnings("unused") +public record BlockInfo( + long blockNum, + long blockTime, + List recordFiles, + ChainFileAndCount mostCommonRecordFile, + SortedMap sidecarFiles, + List signatureFiles) { + + /** + * Create a new BlockInfo instance by passing in all files associated with the block. They are then divided into + * record files, sidecar files and signature files. + * + * @param blockNum the block number + * @param blockTime the block time + * @param allBlockFiles all files associated with the block + */ + public BlockInfo(long blockNum, long blockTime, List allBlockFiles) { + this( + blockNum, + blockTime, + allBlockFiles.stream().filter(cf -> cf.kind() == Kind.RECORD).collect(Collectors.toList()), + mostCommonRecordFileMd5EntryAndCount(allBlockFiles), + collectSidecarFiles(allBlockFiles), + allBlockFiles.stream().filter(cf -> cf.kind() == Kind.SIGNATURE).collect(Collectors.toList())); + } + + /** + * Find the record file with the most occurrences in the list of all block files. This works on the assumption that + * the record file with the most occurrences is the one that is most likely to be the correct record file. + * + * @param allBlockFiles all files associated with the block + * @return the record file with the most occurrences and the number of occurrences + */ + private static ChainFileAndCount mostCommonRecordFileMd5EntryAndCount(List allBlockFiles) { + final Map md5Counts = allBlockFiles.stream() + .filter(cf -> cf.kind() == Kind.RECORD) + .collect(Collectors.groupingBy(ChainFile::md5, Collectors.counting())); + final var maxCountentry = + md5Counts.entrySet().stream().max(Map.Entry.comparingByValue()).orElse(null); + if (maxCountentry == null) { + throw new IllegalStateException("No record files found"); + } + final ChainFile maxCountRecordFile = allBlockFiles.stream() + .filter(cf -> cf.md5().equals(maxCountentry.getKey())) + .findFirst() + .orElse(null); + return new ChainFileAndCount( + maxCountRecordFile, maxCountentry.getValue().intValue()); + } + + /** + * Collect sidecar files from all block files. There can be multiple sidecar files for a block, each with multiple + * copies for from each node. This groups them by sidecar index and returns the most common sidecar file for each + * index in a sorted map. + * + * @param allBlockFiles all files associated with the block + * @return a sorted map of sidecar files, keyed by sidecar index + */ + private static SortedMap collectSidecarFiles(List allBlockFiles) { + // group sidecar files by sidecar index + final Map> sidecarFiles = allBlockFiles.stream() + .filter(cf -> cf.kind() == Kind.SIDECAR) + .collect(Collectors.groupingBy(ChainFile::sidecarIndex)); + final TreeMap sortedSidecarFiles = new TreeMap<>(); + sidecarFiles.forEach((sidecarIndex, sidecarFileList) -> + sortedSidecarFiles.put(sidecarIndex, new NumberedSidecarFile(sidecarFileList))); + return sortedSidecarFiles; + } + + /** Template used for rendering to string. */ + private static final String TEMPLATE = Ansi.AUTO.string( + "@|bold,yellow BlockInfo{|@ @|yellow blockNum=|@$blockNum, @|yellow blockTime=|@$blockTime " + + "@|bold,yellow recordFiles|@ @|yellow total=|@$recordFileCount @|yellow " + + "matching=|@$recordFilesMatching @|cyan -> $recordFilePercent%|@ " + + "@|bold,yellow sidecarFiles=|@ $sidecarFiles" + + "@|bold,yellow signatureFiles|@ @|yellow total=|@$signatureFilesSize " + + "@|bold,yellow }|@"); + + /** + * Render the block info as a string in nice colored output for the console. + * + * @return the block info as a string + */ + @Override + public String toString() { + // check + return TEMPLATE.replace("$blockNum", String.valueOf(blockNum)) + .replace("$blockTime", String.valueOf(blockTime)) + .replace("$recordFileCount", String.valueOf(recordFiles.size())) + .replace("$recordFilesMatching", String.valueOf(mostCommonRecordFile.count())) + .replace( + "$recordFilePercent", + String.valueOf(((mostCommonRecordFile.count() / (double) recordFiles.size()) * 100))) + .replace("$mostCommonRecordFile", mostCommonRecordFile.toString()) + .replace( + "$sidecarFiles", + sidecarFiles.isEmpty() + ? "none" + : sidecarFiles.values().stream() + .map(NumberedSidecarFile::toString) + .collect(Collectors.joining(", "))) + .replace("$signatureFilesSize", String.valueOf(signatureFiles.size())); + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockTimes.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockTimes.java new file mode 100644 index 000000000..b35410f1e --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/BlockTimes.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Read the block times from the block_times.bin file. + */ +@SuppressWarnings("unused") +public class BlockTimes { + /** Mapped buffer on the block_times.bin file. */ + private final LongBuffer blockTimes; + + /** + * Load and map the block_times.bin file into memory. + * + * @param blockTimesFile the path to the block_times.bin file + */ + public BlockTimes(Path blockTimesFile) { + try { + // map file into bytebuffer + final FileChannel fileChannel = FileChannel.open(blockTimesFile, StandardOpenOption.READ); + final ByteBuffer blockTimesBytes = + fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(blockTimesFile)); + fileChannel.close(); + // wrap the ByteBuffer as a LongBuffer so we can easily read longs + blockTimes = blockTimesBytes.asLongBuffer(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Get the block time for the given block number. + * + * @param blockNumber the block number + * @return the block time in milliseconds + */ + public long getBlockTime(int blockNumber) { + return blockTimes.get(blockNumber); + } + + /** + * Get the maximum block number in the block_times.bin file. + * + * @return the maximum block number + */ + public long getMaxBlockNumber() { + return blockTimes.capacity() - 1; + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFile.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFile.java new file mode 100644 index 000000000..3849a04c3 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFile.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.blockTimeInstantToLong; +import static com.hedera.block.tools.commands.record2blocks.util.RecordFileDates.extractRecordFileTime; + +import com.hedera.block.tools.commands.record2blocks.gcp.MainNetBucket; +import java.io.InputStream; +import java.io.Serializable; +import java.util.regex.Pattern; + +/** + * Represents a file in the record stream blockchain. + * + * @param kind the kind of file + * @param nodeAccountId the node account ID + * @param path the path to the file in bucket + * @param blockTime the block time + * @param size the size of the file + * @param md5 the MD5 hash of the file + * @param sidecarIndex the sidecar index, if this file is a sidecar file + */ +public record ChainFile( + Kind kind, int nodeAccountId, String path, long blockTime, int size, String md5, int sidecarIndex) + implements Serializable { + /** The pattern for sidecar file numbers. */ + private static final Pattern SIDECAR_NUMBER_PATTERN = + Pattern.compile("sidecar/\\d{4}-\\d{2}-\\d{2}T\\d{2}_\\d{2}_\\d{2}\\.\\d{9}Z_(\\d{2})\\.rcd\\.gz"); + + /** + * Creates a new chain file. + * + * @param nodeAccountId the node account ID + * @param path the path to the file in bucket + * @param size the size of the file + * @param md5 the MD5 hash of the file + */ + public ChainFile(int nodeAccountId, String path, int size, String md5) { + this( + Kind.fromFilePath(path), + nodeAccountId, + path, + blockTimeInstantToLong(extractRecordFileTime(path.substring(path.lastIndexOf('/') + 1))), + size, + md5, + extractSidecarIndex(path)); + } + + /** + * Extracts the sidecar index from the file path. If the file is not a sidecar file, returns -1. + *

+ * Example: https://storage.googleapis.com/hedera-mainnet-streams/recordstreams/record0.0.34/sidecar/2024-04-04T18_03_26.007683847Z_01.rcd.gz + * + * @param filePath the file path + * @return the sidecar index + */ + private static int extractSidecarIndex(String filePath) { + return SIDECAR_NUMBER_PATTERN + .matcher(filePath) + .results() + .map(m -> Integer.parseInt(m.group(1))) + .findFirst() + .orElse(-1); + } + + /** + * Downloads the file from the bucket. + * + * @param mainNetBucket the main net bucket that contains the file + * @return the file as a byte array + */ + public byte[] download(MainNetBucket mainNetBucket) { + return mainNetBucket.download(path); + } + + /** + * Downloads the file from the bucket as a stream. + * + * @param mainNetBucket the main net bucket that contains the file + * @return the file as a stream + */ + public InputStream downloadStreaming(MainNetBucket mainNetBucket) { + return mainNetBucket.downloadStreaming(path); + } + + /** + * Enum for the kind of file. + */ + public enum Kind { + RECORD, + SIGNATURE, + SIDECAR; + + public static Kind fromFilePath(String filePath) { + if (filePath.contains("sidecar")) { + return SIDECAR; + } else if (filePath.endsWith(".rcd") || filePath.endsWith(".rcd.gz")) { + return RECORD; + } else if (filePath.endsWith(".rcd_sig")) { + return SIGNATURE; + } else { + throw new IllegalArgumentException("Unknown file type: " + filePath); + } + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFileAndCount.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFileAndCount.java new file mode 100644 index 000000000..46cb67d09 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ChainFileAndCount.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +/** + * Simple Record for a ChainFile and the count of how many times there are similar chain files for a record file set. + * + * @param chainFile A chain file that is one of the common identical ones in a record file set + * @param count The number of files that are identical in the record file set + */ +public record ChainFileAndCount(ChainFile chainFile, int count) {} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/NumberedSidecarFile.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/NumberedSidecarFile.java new file mode 100644 index 000000000..9e26b1e63 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/NumberedSidecarFile.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.stream.Collectors; +import picocli.CommandLine.Help.Ansi; + +/** + * NumberedSidecarFile represents a set of sidecar files from all nodes for a single numbered sidecar file for a + * record file. + * + * @param sidecarFileNum the numbered sidecar file + * @param mostCommonSidecarFile the most common sidecar file by MD5 hash + * @param sidecarFiles the list of sidecar files + */ +public record NumberedSidecarFile( + int sidecarFileNum, List sidecarFiles, ChainFileAndCount mostCommonSidecarFile) { + + /** + * Create a NumberedSidecarFile from a list of sidecar files. + * + * @param sidecarFiles the list of sidecar files + */ + public NumberedSidecarFile(List sidecarFiles) { + this(sidecarFiles.getFirst().sidecarIndex(), sidecarFiles, findMostCommonByMD5(sidecarFiles)); + } + + /** + * Find the most common sidecar file by MD5 hash. If there is more than one with most common MD5 hash this just + * picks any one. + * + * @param sidecarFiles the list of sidecar files + * @return the most common sidecar file by MD5 hash as key and count as value + */ + private static ChainFileAndCount findMostCommonByMD5(List sidecarFiles) { + final Entry result = sidecarFiles.stream() + .filter(Objects::nonNull) + .collect(Collectors.groupingBy(md5 -> md5, Collectors.counting())) + .entrySet() + .stream() + .max(Map.Entry.comparingByValue()) + .orElseThrow(); + return new ChainFileAndCount(result.getKey(), result.getValue().intValue()); + } + + /** Template used for rendering to string. */ + private static final String TEMPLATE = Ansi.AUTO.string("@|bold,yellow NumberedSidecarFile{|@ " + + "@|yellow sidecarFileNum=|@$sidecarFileNum, " + + "@|yellow sidecarFilesCount=|@$sidecarFilesCount " + + "@|yellow mostCommon=|@$mostCommonCount " + + "@|bold,yellow }|@"); + + @Override + public String toString() { + // check + return TEMPLATE.replace("$sidecarFileNum", String.valueOf(sidecarFileNum)) + .replace("$sidecarFilesCount", String.valueOf(sidecarFiles.size())) + .replace("$mostCommonCount", String.valueOf(mostCommonSidecarFile.count())); + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ParsedSignatureFile.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ParsedSignatureFile.java new file mode 100644 index 000000000..4ee1dd9ae --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/ParsedSignatureFile.java @@ -0,0 +1,330 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +import com.hedera.block.tools.commands.record2blocks.gcp.MainNetBucket; +import com.hedera.hapi.streams.SignatureFile; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.HexFormat; + +/** + * SignatureFile represents a Hedera record file signature file. There have been 3 versions of the signature files used + * since OA which are V3, V5 and V6. The below tables describe the content that can be parsed from a record signature + * file for each version. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Signature File Format V3
NameType (Bytes)Description
File Hash MarkerbyteValue: 4
File Hashbyte[48]SHA384 hash of corresponding *.rcd file
Signature MarkerbyteValue: 3
Length of Signatureint (4)Byte size of the following signature bytes
Signaturebyte[]Signature bytes
+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Signature File Format V5
NameType (Bytes)Description
Signature File Format VersionbyteValue: 5
Object Stream Signature Versionint (4) Value: 1
This defines the format of the remainder of the signature file. This version number is used when parsing a + * signature file with methods defined in swirlds-common package
Entire Hash of the corresponding stream filebyte[48]SHA384 Hash of the entire corresponding stream file
Signature on hash bytes of Entire Hashbyte[]A signature object generated by signing the hash bytes of Entire Hash. See ` Signature ` table below for + * details
Metadata Hash of the corresponding stream filebyte[48]Metadata Hash of the corresponding stream file
Signature on hash bytes of Metadata Hashbyte[]A signature object generated by signing the hash bytes of Metadata Hash
+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Signature File Format V5 - Signature Object
NameType (Bytes)Description
Class IDlong (8)Value: 0x13dc4b399b245c69
Class Versionint (4)Value: 1
SignatureTypeint (4)Value: 1 - Denotes SHA384withRSA
Length of Signatureint (4)Size of the signature in bytes
CheckSumint (4)101 - length of signature bytes
Signature bytesbyte[]Serialized Signature bytes
+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Signature File Format V6
NameType (Bytes)Description
Signature File Format VersionbyteValue: 6
Protobuf Encodedbyte[]Rest of signature file is a protobuf serialized message of type com.hedera.hapi.streams.SignatureFile
+ * + * @param nodeId Node ID of the node that signed the file + * @param fileHash SHA384 hash of corresponding *.rcd file + * @param signature Signature bytes or RSA signature of the file hash, signed by the node's private key + */ +public record ParsedSignatureFile(int nodeId, byte[] fileHash, byte[] signature) { + /** + * The marker for the file hash in a V3 signature file. This is the first byte so also acts like a version number. + */ + public static final byte V2_FILE_HASH_MARKER = 4; + + public static final byte FILE_VERSION_5 = 5; + public static final byte FILE_VERSION_6 = 6; + public static final byte V3_SIGNATURE_MARKER = 3; + + /** + * toString for debugging, prints the file hash and signature in hex format. + * + * @return the string representation of the SignatureFile + */ + @Override + public String toString() { + final HexFormat hexFormat = HexFormat.of(); + return "SignatureFile[" + "nodeId=" + + nodeId + ", " + "fileHash=" + + hexFormat.formatHex(fileHash) + ", signature=" + + hexFormat.formatHex(signature) + ']'; + } + + /** + * Download and parse a SignatureFile from a ChainFile. + * + * @param signatureChainFile the chain file for the signature file + * @param mainNetBucket the bucket to download from + * @return the parsed SignatureFile + */ + public static ParsedSignatureFile downloadAndParse(ChainFile signatureChainFile, MainNetBucket mainNetBucket) { + // first download + try (DataInputStream in = new DataInputStream(signatureChainFile.downloadStreaming(mainNetBucket))) { + // extract node ID from file path. This depends on the fixed relationship between node account ids and node + // ids. + final int nodeId = signatureChainFile.nodeAccountId() - 3; + // now parse + final int firstByte = in.read(); + // the first byte is either the file hash marker or a version number in V6 record stream + switch (firstByte) { + case V2_FILE_HASH_MARKER: + final byte[] fileHash = new byte[48]; + in.readFully(fileHash); + if (in.read() != V3_SIGNATURE_MARKER) { + throw new IllegalArgumentException("Invalid signature marker"); + } + final int signatureLength = in.readInt(); + final byte[] signature = new byte[signatureLength]; + in.readFully(signature); + return new ParsedSignatureFile(nodeId, fileHash, signature); + case FILE_VERSION_5: + // check the object stream signature version should be 1 + if (in.readInt() != 1) { + throw new IllegalArgumentException("Invalid object stream signature version"); + } + // read hash object - hash bytes + final byte[] entireFileHash = readHashObject(in); + // read signature object - class id + if (in.readLong() != 0x13dc4b399b245c69L) { + throw new IllegalArgumentException("Invalid signature object class ID"); + } + // read signature object - class version + if (in.readInt() != 1) { + throw new IllegalArgumentException("Invalid signature object class version"); + } + // read signature object - signature type - An RSA signature as specified by the FIPS 186-4 + if (in.readInt() != 1) { + throw new IllegalArgumentException("Invalid signature type"); + } + // read signature object - length of signature + final int signatureLengthV5 = in.readInt(); + // read and check signature object - checksum + if (in.readInt() != 101 - signatureLengthV5) { + throw new IllegalArgumentException("Invalid checksum"); + } + // read signature object - signature bytes + final byte[] signatureV5 = new byte[signatureLengthV5]; + in.readFully(signatureV5); + // we only care about the file metadata hash and the signature so can stop parsing here + return new ParsedSignatureFile(nodeId, entireFileHash, signatureV5); + case FILE_VERSION_6: + // everything from here on is protobuf encoded + try { + SignatureFile signatureFile = SignatureFile.PROTOBUF.parse(new ReadableStreamingData(in)); + if (signatureFile.fileSignature() == null) { + throw new IllegalArgumentException("Invalid signature file, missing file signature"); + } + if (signatureFile.fileSignature().hashObject() == null) { + throw new IllegalArgumentException("Invalid signature file, missing hash object"); + } + return new ParsedSignatureFile( + nodeId, + signatureFile + .fileSignature() + .hashObject() + .hash() + .toByteArray(), + signatureFile.fileSignature().signature().toByteArray()); + } catch (ParseException e) { + throw new RuntimeException("Error protobuf parsing V6 signature file", e); + } + default: + throw new IllegalArgumentException("Invalid first byte [" + firstByte + "] expected " + + V2_FILE_HASH_MARKER + " or " + FILE_VERSION_6); + } + } catch (IOException e) { + throw new RuntimeException("Error downloading or parsing signature file", e); + } + } + + /** The size of a hash object in bytes */ + public static final int HASH_OBJECT_SIZE_BYTES = Long.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES + 48; + + /** + * Read a hash object from a data input stream in SelfSerializable SHA384 format. + * + * @param in the data input stream + * @return the hash bytes + * @throws IOException if an error occurs reading the hash object + */ + public static byte[] readHashObject(DataInputStream in) throws IOException { + // read hash class id + if (in.readLong() != 0xf422da83a251741eL) { + throw new IllegalArgumentException("Invalid hash class ID"); + } + // read hash class version + if (in.readInt() != 1) { + throw new IllegalArgumentException("Invalid hash class version"); + } + // read hash object, starting with digest type SHA384 + if (in.readInt() != 0x58ff811b) { + throw new IllegalArgumentException("Invalid digest type not SHA384"); + } + // read hash object - length of hash + if (in.readInt() != 48) { + throw new IllegalArgumentException("Invalid hash length"); + } + // read hash object - hash bytes + final byte[] entireFileHash = new byte[48]; + in.readFully(entireFileHash); + return entireFileHash; + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/RecordFileInfo.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/RecordFileInfo.java new file mode 100644 index 000000000..a5b7c3d76 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/model/RecordFileInfo.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.model; + +import static com.hedera.block.tools.commands.record2blocks.model.ParsedSignatureFile.HASH_OBJECT_SIZE_BYTES; +import static com.hedera.block.tools.commands.record2blocks.model.ParsedSignatureFile.readHashObject; + +import com.hedera.hapi.node.base.SemanticVersion; +import com.hedera.hapi.streams.RecordStreamFile; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.security.MessageDigest; + +/** + * Represents the version and block hash information of a record file. + *

+ * The old record file formats are documented in the + * + * Mirror Node code. and in the legacy documentation on + * + * Way Back Machine + *

+ * + * @param hapiProtoVersion the HAPI protocol version + * @param blockHash the block hash + * @param recordFileContents the record file contents + */ +public record RecordFileInfo(SemanticVersion hapiProtoVersion, Bytes blockHash, byte[] recordFileContents) { + /* The length of the header in a v2 record file */ + private static final int V2_HEADER_LENGTH = Integer.BYTES + Integer.BYTES + 1 + 48; + + /** + * Parses the record file to extract the HAPI protocol version and the block hash. + * + * @param recordFile the record file bytes to parse + * @return the record file version info + */ + public static RecordFileInfo parse(byte[] recordFile) { + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(recordFile))) { + final int recordFormatVersion = in.readInt(); + // This is a minimal parser for all record file formats only extracting the necessary information + return switch (recordFormatVersion) { + case 2 -> { + final int hapiMajorVersion = in.readInt(); + final SemanticVersion hapiProtoVersion = new SemanticVersion(hapiMajorVersion, 0, 0, null, null); + // The hash for v2 files is the hash(header, hash(content)) this is different to other versions + // the block hash is not available in the file so we have to calculate it + MessageDigest digest = MessageDigest.getInstance("SHA-384"); + digest.update(recordFile, V2_HEADER_LENGTH, recordFile.length - V2_HEADER_LENGTH); + final byte[] contentHash = digest.digest(); + digest.update(recordFile, 0, V2_HEADER_LENGTH); + digest.update(contentHash); + yield new RecordFileInfo(hapiProtoVersion, Bytes.wrap(digest.digest()), recordFile); + } + case 5 -> { + final int hapiMajorVersion = in.readInt(); + final int hapiMinorVersion = in.readInt(); + final int hapiPatchVersion = in.readInt(); + final SemanticVersion hapiProtoVersion = + new SemanticVersion(hapiMajorVersion, hapiMinorVersion, hapiPatchVersion, null, null); + // skip to last hash object. This trick allows us to not have to understand the format for record + // file items and their contents which is much more complicated. For v5 and v6 the block hash is the + // end running hash which is written as a special item at the end of the file. + in.skipBytes(in.available() - HASH_OBJECT_SIZE_BYTES); + final byte[] endHashObject = readHashObject(in); + yield new RecordFileInfo(hapiProtoVersion, Bytes.wrap(endHashObject), recordFile); + } + case 6 -> { + // V6 is nice and easy as it is all protobuf encoded after the first version integer + final RecordStreamFile recordStreamFile = + RecordStreamFile.PROTOBUF.parse(new ReadableStreamingData(in)); + // For v6 the block hash is the end running hash which is accessed via endObjectRunningHash() + if (recordStreamFile.endObjectRunningHash() == null) { + throw new IllegalStateException("No end object running hash in record file"); + } + yield new RecordFileInfo( + recordStreamFile.hapiProtoVersion(), + recordStreamFile.endObjectRunningHash().hash(), + recordFile); + } + default -> throw new UnsupportedOperationException( + "Unsupported record format version: " + recordFormatVersion); + }; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/BlockWriter.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/BlockWriter.java new file mode 100644 index 000000000..ba3814a8e --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/BlockWriter.java @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.util; + +import com.github.luben.zstd.ZstdOutputStream; +import com.hedera.hapi.block.stream.Block; +import com.hedera.pbj.runtime.io.stream.WritableStreamingData; +import java.io.IOException; +import java.net.URI; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.Map; + +/** + * Utility class for creating paths to block files, in block node format. + */ +@SuppressWarnings("DataFlowIssue") +public class BlockWriter { + + /** + * Record for block path components + * + * @param dirPath The directory path for the directory that contains the zip file + * @param zipFileName The name of the zip file + * @param blockNumStr The block number as a string + * @param blockFileName The name of the block file in the zip file + */ + public record BlockPath(Path dirPath, String zipFileName, String blockNumStr, String blockFileName) {} + + /** The format for block numbers in file names */ + private static final NumberFormat BLOCK_NUMBER_FORMAT = new DecimalFormat("0000000000000000000"); + /** The extension for compressed block files */ + private static final String COMPRESSED_BLOCK_FILE_EXTENSION = ".blk.zstd"; + /** The number of digits per directory */ + private static final int DEFAULT_DIGITS_PER_DIR = 3; + /** The number of digits per zip file name */ + private static final int DEFAULT_DIGITS_PER_ZIP_FILE_NAME = 1; + + /** + * Write a block to a zip file + * + * @param baseDirectory The base directory for the block files + * @param block The block to write + * @throws IOException If an error occurs writing the block + * @return The path to the block file + */ + public static BlockPath writeBlock(final Path baseDirectory, final Block block) throws IOException { + // get block number from block header + final var firstBlockItem = block.items().getFirst(); + final long blockNumber = firstBlockItem.blockHeader().number(); + // compute block path + final BlockPath blockPath = computeBlockPath(baseDirectory, blockNumber); + // create directories + Files.createDirectories(blockPath.dirPath); + // create zip file path + final Path zipPath = blockPath.dirPath.resolve(blockPath.zipFileName); + // append block to zip file, creating zip file if it doesn't exist + try (FileSystem fs = FileSystems.newFileSystem( + URI.create("jar:" + zipPath.toUri()), Map.of("create", "true", "compressionMethod", "STORED"))) { + Path blockPathInZip = fs.getPath(blockPath.blockFileName); + try (WritableStreamingData out = new WritableStreamingData(new ZstdOutputStream( + Files.newOutputStream(blockPathInZip, StandardOpenOption.CREATE, StandardOpenOption.WRITE)))) { + Block.PROTOBUF.write(block, out); + } + } + // return block path + return blockPath; + } + + /** + * Compute the path to a block file + * + * @param baseDirectory The base directory for the block files + * @param blockNumber The block number + * @return The path to the block file + */ + private static BlockPath computeBlockPath(final Path baseDirectory, long blockNumber) { + // convert block number to string + final String blockNumberStr = BLOCK_NUMBER_FORMAT.format(blockNumber); + // split string into digits for zip and for directories + final int offsetToZip = blockNumberStr.length() - DEFAULT_DIGITS_PER_ZIP_FILE_NAME - DEFAULT_DIGITS_PER_DIR; + final String directoryDigits = blockNumberStr.substring(0, offsetToZip); + final String zipFileNameDigits = + blockNumberStr.substring(offsetToZip, offsetToZip + DEFAULT_DIGITS_PER_ZIP_FILE_NAME); + // start building path to zip file + Path dirPath = baseDirectory; + for (int i = 0; i < directoryDigits.length(); i += DEFAULT_DIGITS_PER_DIR) { + final String dirName = + directoryDigits.substring(i, Math.min(i + DEFAULT_DIGITS_PER_DIR, directoryDigits.length())); + dirPath = dirPath.resolve(dirName); + } + // create zip file name + final String zipFileName = zipFileNameDigits + "000s.zip"; + final String fileName = blockNumberStr + COMPRESSED_BLOCK_FILE_EXTENSION; + return new BlockPath(dirPath, zipFileName, blockNumberStr, fileName); + } + + /** + * Simple main method to test the block path computation + * + * @param args The command line arguments + */ + public static void main(String[] args) { + for (long blockNumber = 0; blockNumber < 3002; blockNumber++) { + final var blockPath = computeBlockPath(Path.of("data"), blockNumber); + System.out.println("blockPath = " + blockPath); + } + } +} diff --git a/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/RecordFileDates.java b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/RecordFileDates.java new file mode 100644 index 000000000..c27de0f46 --- /dev/null +++ b/tools/src/main/java/com/hedera/block/tools/commands/record2blocks/util/RecordFileDates.java @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.tools.commands.record2blocks.util; + +import java.time.Duration; +import java.time.Instant; +import java.time.format.DateTimeParseException; + +/** + * Utility class to store help with record file dates. + *

+ * Record file names look like "2024-07-06T16_42_40.006863632Z.rcd.gz" the first part is a date time in ISO format + * in UTC time zone. + *

+ *

+ * Block times are longs in nanoseconds since the first block after OA. They allow very condensed storage of times + * in the Hedera blockchain history. + *

+ */ +@SuppressWarnings("unused") +public final class RecordFileDates { + /** the record file time of the first block after OA */ + public static final String FIRST_BLOCK_TIME = "2019-09-13T21_53_51.396440Z"; + /** the record file time of the first block after OA as Instant */ + public static final Instant FIRST_BLOCK_TIME_INSTANT = Instant.parse(FIRST_BLOCK_TIME.replace('_', ':')); + /** one hour in nanoseconds */ + public static final long ONE_HOUR = Duration.ofHours(1).toNanos(); + /** one day in nanoseconds */ + public static final long ONE_DAY = Duration.ofDays(1).toNanos(); + + /** + * Extract the record file time from a record file name. + * + * @param recordOrSidecarFileName the record file name, like "2024-07-06T16_42_40.006863632Z.rcd.gz" or a sidecar + * file name like "2024-07-06T16_42_40.006863632Z_02.rcd.gz" + * @return the record file time as an Instant + */ + public static Instant extractRecordFileTime(String recordOrSidecarFileName) { + String dateString; + // check if a sidecar file + if (recordOrSidecarFileName.contains("Z_")) { + dateString = recordOrSidecarFileName + .substring(0, recordOrSidecarFileName.lastIndexOf("_")) + .replace('_', ':'); + } else { + dateString = recordOrSidecarFileName + .substring(0, recordOrSidecarFileName.indexOf(".rcd")) + .replace('_', ':'); + } + try { + return Instant.parse(dateString); + } catch (DateTimeParseException e) { + throw new RuntimeException( + "Invalid record file name: \"" + recordOrSidecarFileName + "\" - dateString=\"" + dateString + "\"", + e); + } + } + + /** + * Convert a block time long to an instant. + * + * @param blockTime the block time in nanoseconds since the first block after OA + * @return the block time instant + */ + public static Instant blockTimeLongToInstant(long blockTime) { + return FIRST_BLOCK_TIME_INSTANT.plusNanos(blockTime); + } + + /** + * Convert an instant to a block time long. + * + * @param instant the instant + * @return the block time in nanoseconds since the first block after OA + */ + public static long instantToBlockTimeLong(Instant instant) { + return Duration.between(FIRST_BLOCK_TIME_INSTANT, instant).toNanos(); + } + + /** + * Convert an instant in time to a block time long. + * + * @param dateTime the time instant + * @return the block time in nanoseconds since the first block after OA + */ + public static long blockTimeInstantToLong(Instant dateTime) { + return Duration.between(FIRST_BLOCK_TIME_INSTANT, dateTime).toNanos(); + } + + /** + * Convert a record file name to a block time long. + * + * @param recordFileName the record file name, like "2024-07-06T16_42_40.006863632Z.rcd.gz" + * @return the block time in nanoseconds since the first block after OA + */ + public static long recordFileNameToBlockTimeLong(String recordFileName) { + return blockTimeInstantToLong(extractRecordFileTime(recordFileName)); + } + + /** + * Convert a block time long to a record file prefix string. + * + * @param blockTime the block time in nanoseconds since the first block after OA + * @return the record file prefix string, like "2019-09-13T21_53_51.39644" + */ + public static String blockTimeLongToRecordFilePrefix(long blockTime) { + String blockTimeString = blockTimeLongToInstant(blockTime).toString().replace(':', '_'); + // remove the 'Z' at the end + blockTimeString = blockTimeString.substring(0, blockTimeString.length() - 1); + return blockTimeString; + } +} diff --git a/tools/src/main/java/module-info.java b/tools/src/main/java/module-info.java deleted file mode 100644 index 098aaa36e..000000000 --- a/tools/src/main/java/module-info.java +++ /dev/null @@ -1,15 +0,0 @@ -/** Runtime module of block stream tools. */ -module com.hedera.block.tools { - exports com.hedera.block.tools; - - opens com.hedera.block.tools to - info.picocli; - opens com.hedera.block.tools.commands to - info.picocli; - - requires static com.github.spotbugs.annotations; - requires static com.google.auto.service; - requires com.hedera.block.stream; - requires com.hedera.pbj.runtime; - requires info.picocli; -}