diff --git a/.github/workflows/config/node-release.yaml b/.github/workflows/config/node-release.yaml index d97f02c2b6cf..029a80e4f33d 100644 --- a/.github/workflows/config/node-release.yaml +++ b/.github/workflows/config/node-release.yaml @@ -1,11 +1,11 @@ release: branching: execution: - time: "19:00:00" + time: "20:00:00" schedule: - - on: "2023-12-01" - name: release/0.45 + - on: "2023-02-27" + name: release/0.48 initial-tag: - create: true + create: false name: v0.45.0-alpha.2 diff --git a/hedera-node/docs/design/services/smart-contract-service/atomic-crypto-transfer.md b/hedera-node/docs/design/services/smart-contract-service/atomic-crypto-transfer.md new file mode 100644 index 000000000000..e9251d57da27 --- /dev/null +++ b/hedera-node/docs/design/services/smart-contract-service/atomic-crypto-transfer.md @@ -0,0 +1,76 @@ +# Atomic Crypto Transfer + +## Purpose + +[HIP-206](https://hips.hedera.com/hip/hip-206) defines functionality to perform atomic crypto transfers of value - hbar and tokens - between accounts using the IHederaTokenService interface. This document will define the architecture and implementation of this functionality. + +## Goals + +Describe the implementation of the atomic crypto transfer functionality in the Hedera Smart Contract Service. + +## Non Goals + +- The implementation of the atomic crypto transfer functionality in the mono service codebase will not be discussed. +- The implementation details of crypto `allowance` and `approvals` for hbar from within smart contracts is outside the scope of this document and will be described in a separate document. + +## Architecture + +The architecture for atomic crypto transfer follows the existing framework defined for handling all calls to the HederaTokenService system contract in the modularization services and is described in more detail in the `Implementation` section below. + +## Implementation +Handling token related functionality is generally cooperatively performed by the smart contracts service module and the token service module. The following classes are used to handle the call to the atomic crypto transfer function: + +### Smart Contract Service Module +1. The smart contract service implements a `CustomMessageCallProcessor` class which overrides the Besu `MessageCallProcessor` class in order to potentially intercept calls to system contract addresses. +If the contract address of the current call is determined to be the contract address for the HederaTokenService (0x167), the call is redirected to the `HtsSystemContract` class for processing. +2. The `HtsSystemContracts` creates an instance of the `HtsCallAttempt` class from the input bytes and the current message frame to encapsulate the call. +3. The `HtsCallAttempt` class iterates through a list of `Translator` classes (provided by Dagger) in order to determine which translator will be responsible for processing the call by attempting to match the call's +function signature to a signature known by the translator. In the case of atomic crypto transfer calls, the `ClassicTransferTranslator` class will be responsible for processing the calls which has the following function signature: \ +```cryptoTransfer(((address,int64,bool)[]),(address,(address,int64,bool)[],(address,address,int64,bool)[])[])``` +4. The `ClassicTransferTranslator` class will call the `ClassicTransferDecoder` class to decode the parameters of the call and translating the encoded parameter into a `TransactionBody` object. +5. A class called `ClassicTransfersCall` will then take the created `TransactionBody` object and dispatches a new transaction to the Token Service Module for processing. +- Before dispatching the relevant feature flag `contracts.precompile.atomicCryptoTransfer.enabled=true` will be checked. +- A `VerificationStrategy` class will be provided to the token service during dispatch in order to ensure that the security model is adhered to. +- It is also responsible for other miscellaneous tasks such as checking for sufficient gas and encoding the response. + +![image info](./class_diagram.drawio.png) + +### Token Service Module + +Once the smart contract service dispatches the transaction to the Hedera Token Service the following steps are performed: + +1. Validate the semantic correctness of the transaction. +2. Handle any aliases found and potentially create hollow accounts as necessary. +3. Handle auto associations +4. Add transfers to the transfer list +5. Handle custom fees and add the resulting transfers to the transfer list +6. Perform the transfers between accounts using the transfer list +7. Create the resulting record and return to the caller. + +The implementation can be found starting in the `CryptoTransferHandler` class. + +## Acceptance Tests + +As outlined above most of the implementation of the atomic crypto transfer functionality has already been completed. What remains is to write acceptance tests +to validate the functionality with a particular emphasis on security and edge cases and fixing issues as they arise. Below is a diagram that enumerates various transfer scenarios that need to be tested. + +![image info](./transfer_scenarios.drawio.png) + + +### BDD Tests + +BDD tests are required to cover security concerns which require complex signing scenarios. Many of these tests +are already implemented and need not be repeated as XTests. + +#### Positive Tests +- Successful transfer of hbars only and HTS tokens only between accounts from sender account via contract. +- Successful transfer of hbars and HTS tokens with a custom fees (including fallback fee scenarios). +- Successful transfer of hbars and HTS tokens with available auto token association slots on the receiver. +- Successful transfer of hbars and HTS tokens from EOA account given approval to another EOA. +- Successful transfer of hbars and HTS tokens from EOA account given approval to a caller contract. +- Successful transfer of hbars and HTS tokens from owner contract via transfer contract. + +#### Negative Tests + +- Failure when attempting to transfer from special system accounts. +- Failure when receiver signature is required and not provided. diff --git a/hedera-node/docs/design/services/smart-contract-service/class_diagram.drawio.png b/hedera-node/docs/design/services/smart-contract-service/class_diagram.drawio.png new file mode 100644 index 000000000000..77bff8ad765f Binary files /dev/null and b/hedera-node/docs/design/services/smart-contract-service/class_diagram.drawio.png differ diff --git a/hedera-node/docs/design/services/smart-contract-service/transfer_scenarios.drawio.png b/hedera-node/docs/design/services/smart-contract-service/transfer_scenarios.drawio.png new file mode 100644 index 000000000000..302c49b4f35c Binary files /dev/null and b/hedera-node/docs/design/services/smart-contract-service/transfer_scenarios.drawio.png differ diff --git a/hedera-node/hedera-app/src/xtest/java/README.md b/hedera-node/hedera-app/src/xtest/java/README.md new file mode 100644 index 000000000000..faffc9669332 --- /dev/null +++ b/hedera-node/hedera-app/src/xtest/java/README.md @@ -0,0 +1,47 @@ +# XTests + +## Introduction +XTests are intended to fill the gap between unit tests and integration tests. +They are designed to test the functionality of a services module in its entirety, as well as +interactions between module. + +## Goals and Benefits +- Easier to write, read and understand, and to debug into than the HAPI suite BDDs because they just exercise the service (or services directly) through its API instead of being an end-to-end test where a HAPI transaction goes in and a record stream comes out, with all kinds of concurrency and asynchronicity happening in between, and the measurement of the effect being rather indirect. +- At the same time making it easier to test edge cases because you can hand-craft test data using the actual data structures taken by the API, and can thus drive the system into states it can't normally get to by using only valid HAPI transactions. +- A more complete test of the service-under-test than (our current) unit tests because mocking is extremely discouraged. So you're actually testing the real code instead of hard-wiring in to the test a bunch of assumptions of how the real code is supposed to work. +- And yet they should have the flavor of a unit test: small, tightly focused to one specific issue and in fact one specific edge case (and thus compartmentalized from other tests testing in the same area) +- xTests are ideally suited for testing and understanding inter module interactions such as dispatching transactions from smart contracts service to token service +- xTests executes more quickly and thus allows the developer to iterate between test and functionality more efficiently than integration tests + +## When xTests Are Not Appropriate + +While xTests offer benefits as described above, they are not appropriate for all testing scenarios. +Some specific scenarios where xTests are not appropriate include: + +- Any test that require multiple nodes to be running +- Tests that require complex signing/signature verification +- Any test where the existing scaffolding infrastructure needs to be significantly modified (although this will likely become more flexible over time) + +## Mocking + +xTests support mocking of objects just as in any unit test however in general mocking is discouraged unless the mocked object has no bearing on the functionality under test. One objective to xTests is to understand potential side effects of tested functionality which may be hidden by excessive mocking. + +## xTest Structure and Examples + +Just as during regular execution of nodes, xTests depend on dagger to provide the necessary dependencies such as state, context, fees and configuration. +The classes that provide these basic scaffolding dependencies are +[BaseScaffoldingComponent](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/common/BaseScaffoldingComponent.java) and [BaseScaffoldingModule](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/common/BaseScaffoldingModule.java). + +There is a base class for all xTests called `AbstractXTest` which provide basic, common and useful functionality. In particular the [scenarioPasses()](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/common/AbstractXTest.java#L119) +method provides the structure for a test execution - from setup to scenario execution to assertions. + +Currently, xTests have been written for the smart contract service (primarily centered around system contracts) and the token service. +All xTests for the smart contract service should descend from [AbstractContractXTest](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/contract/AbstractContractXTest.java) +which provides common functionality such as calling smart contract handlers with synthetic transactions and calling system contract functions. +Several prototypical examples to illustrate common xTest patterns are +[AssociationsXTest](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/contract/AssociationsXTest.java), +[ClassicViewsXTest](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/contract/ClassicViewsXTest.java) +and [HtsErc20TransfersXTest](https://github.com/hashgraph/hedera-services/blob/develop/hedera-node/hedera-app/src/xtest/java/contract/HtsErc20TransfersXTest.java) + +Generally, xTests follow the pattern of setting up initial structures (accounts, tokens, aliases, tokenRels etc.), executing a scenario and then asserting the results. +The scenarios are defined by overriding the `doScenarioOperations()` method in the xTest class. \ No newline at end of file diff --git a/hedera-node/test-clients/scripts/diff-testing/00-steps.md b/hedera-node/test-clients/scripts/diff-testing/00-steps.md new file mode 100644 index 000000000000..602128cda431 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/00-steps.md @@ -0,0 +1,127 @@ + + +## GCP setup + +* `gcloud components update` +* `gcloud init` -- if needed + * pick zone 'us-west1-c` -- arbitrarily (?) +* `gcloud auth application-default login` -- and sign in w/ account + +## Python setup +* have `venv` available, install `3.12.0` (or something else recent) +* `pip install -r requirements.txt` + * `pip install google-cloud-storage` +* `open /Applications/Python\ 3.12/Install\ Certificates.command` + +## Get hostnames from `swirlds/infrastructure` repo +* see `mainnet/ansible/host_vars_mainnet/node*.yml` +* `grep source_hostname * | sed 's/[.]yml:source_hostname://g'` +* use file `mainnet_hostnames-2024-02-04.txt` + +## To run: + +First you use the script to create a file that contains the names of all the +stream files in the interval that are up there in Google Cloud Storage. +(And this file contains other date, such as the size of the file, and +whether or not it has been downloaded already.) + +Then you use that file to download all the data files (which Google Cloud +Storage calls "blobs"). + +Easy. Except due to cloud and network problems the smooth processing of +calls to Google Cloud Storage may get interrupted from time to time. So the +script allows you to _rerun_ both phases until they're finally complete +(keeping track, via that file created in the first phase, of stuff that's +already been done, so it isn't done again.) + +1. First create the list of blobs (record/event files) you want to download from gcp: + ```bash + python3 main.py get_blob_names -root \ + -b \ + -s -e \ + -node + ``` + where the start and end of the interval are specified like `2024-02-01T00:00:00`, + and the node number is the node you want to pull files for. + + This will tell you how many files it found in that interval. + + But it may happen the node you picked was down for some time during + that interval. So run the script again using the command `reget_blob_names` + (instead of `get_blob_names`) and specify a different node number. It + will _merge_ additional files found into the bloblist you already + have. Repeat until it finds no new files. + +2. Download the blobs with the command + ```bash + python3 main.py download_blobs -root \ + -b + ``` + It will fetch files in batches - and give you a progress report on + how many batches it is doing. + + Files can fail to download. Keep repeating this command until you see, + by the metrics reported, that all the files are downloaded. + + You can "tune" the performance by changing the batch size with the + `-batch nnn` argument, and by changing the level of concurrency with the + `-concurrency nnn` argument. + +## More explanation + +- What are all these "blobs" in the names of things in the source code? +"Blob" is what Google Cloud Storage calls a "file". + +- All nodes put the streams out on GCP, even though they're _identical_ (except for the + signatures, of course). So you can pick any node number (1..29) to pull +from, your choice. But sometimes nodes go down so the data is missing out +there (without any indication). So this script lets you _rerun_ getting the +names of all the files, using a different node number ... you do that until +no new files are discovered. Then, presumably, you have a full set of +stream file names. + + But the node you pull from doesn't matter. And so the default chosen by +the script, if you don't specify one, is arbitrary. + +- Speaking of nodes, nodes are always referred to by their _number_ (1.. +29/30). And that's true whether the script calls it a "number" or an "id" or +whatever. + +- This script - and I still think of it as a script - has grown a bit. It +might be a bit larger than a "script" now ... but there it is. And so, even +though it is not good software engineering to use _global variables_ this +script does use global variables. And since those global variables are +referred to everywhere, even though it is not good software engineering to +use _single character variable names **especially** for global variables_ +this script _does_ use single character variable names to refer to global +variables. Because I wanted the minimum syntax overhead to specify the +context. Because it's necessary to have the context for readability, but +too much of it is just visual cruft. + + - `a` is the global holder of command line arguments. All commands line +arguments are in there, with default values for those that weren't actually +on the command line. It is of the class `Args`, and both are declared in +`cli_arguments.py` and imported elsewhere. + - `g` is the global holder of global variables. It is of the class +`Globals`, and both are declared in `globals.py` and imported elsewhere. + + And unfortunately importing a variable doesn't work the way I expected in +Python. I'm not sure exactly what happens, but I think a copy is made. +Shallow copy probably. Anyway, it hasn't impacted things so far but it's +something to be aware of and I'll probably have to fix it if the script has +further developments. + diff --git a/hedera-node/test-clients/scripts/diff-testing/00-steps.pdf b/hedera-node/test-clients/scripts/diff-testing/00-steps.pdf new file mode 100644 index 000000000000..6c55b29cec66 Binary files /dev/null and b/hedera-node/test-clients/scripts/diff-testing/00-steps.pdf differ diff --git a/hedera-node/test-clients/scripts/diff-testing/bloblist.py b/hedera-node/test-clients/scripts/diff-testing/bloblist.py new file mode 100644 index 000000000000..9462de1a0cee --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/bloblist.py @@ -0,0 +1,89 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import copy +from dataclasses import dataclass, replace as clone +from datetime import datetime +from typing import List, Set, Optional + +import jsons + +from utils import * +from versioned_output_file import VersionedOutputFile + + +@dataclass +class Blob: + name: str + type: str + have: bool + node: int + crc32c: str # base64, big-endian + md5hash: str # base64, big-endian (presumably) + size: int + + def __copy__(self): + return clone(self) + + def __hash__(self): + """For these (mutable) objects, the 'primary key" is the type+name""" + return hash((self.name, self.type)) + + +@dataclass +class BlobList: + interval_start_time: datetime + interval_end_time: datetime + nodes: Set[int] + blobs: List[Blob] + + def __len__(self): + return len(self.blobs) + + def __copy__(self): + return clone(self) + + def __deepcopy__(self, memo): + bl = clone(self) + bl.nodes = bl.nodes.copy() + bl.blobs = [copy.copy(b) for b in bl.blobs] + return bl + + @staticmethod + def save(path: str, bloblist: BlobList) -> None: + contents = jsons.dumps(bloblist, ensure_ascii=False, jdkwargs={"indent": 4}) + with VersionedOutputFile(path) as f: + f.write(contents) + + @staticmethod + def load(filename: str) -> Optional[BlobList]: + try: + with open(filename, 'r', encoding='utf-8') as file: + contents = file.read() + except IOError as ex: + print(f"*** BlobList.load({filename}): IOError: {ex}") + contents = None + bloblist = jsons.loads(contents, BlobList) if contents is not None else None + + # Our use of datetime.fromisoformat needs naive datetimes (without timezone) that json package puts there + bloblist.interval_start_time = naiveify(bloblist.interval_start_time) + bloblist.interval_end_time = naiveify(bloblist.interval_end_time) + return bloblist + + +# For various reasons related to formatting blob names we use naive datetime - without timezones - so we must tell +# jsons library not to complain about that +jsons.suppress_warning('datetime-without-tz') diff --git a/hedera-node/test-clients/scripts/diff-testing/cli_arguments.py b/hedera-node/test-clients/scripts/diff-testing/cli_arguments.py new file mode 100644 index 000000000000..114ab7650554 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/cli_arguments.py @@ -0,0 +1,79 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from typing import Literal, Optional + +from tap import Tap + +from configuration import Configuration as C + + +class Args(Tap): + action: Literal['get_blob_names', + 'reget_blob_names', + 'download_blobs', + 'load_bloblist'] # action (command) to perform + local_storage_rootdir: str # root directory where all files should be stored + bloblist_filename: str # filename (rel. to root) for the blob list and blobs status + hostnamemap_filename: str # filename (rel. to root) for the node# <==> hostname map + interval_start_time: datetime # start of the data-pull interval + interval_end_time: datetime # end of hte data-pull interval + node: int # node to get blobs from + n_batch_size: int # number of blobs to download in a batch + concurrent_type: Literal['thread', 'process'] + n_concurrent: int # number of processes to use for downloads + + def configure(self): + self.add_argument('action') + self.add_argument('-r', '-root', '--local-storage-rootdir', default=C.LOCAL_STORAGE_ROOT) + self.add_argument('-b', '-bloblist', '--bloblist-filename', default=C.MAINNET_BLOBLIST_FILENAME) + self.add_argument('-m', '-hosts', '--hostnamemap-filename', default=C.MAINNET_HOSTNAMES_FILENAME) + self.add_argument('-s', '-start', '--interval-start-time', default=C.INTERVAL_START_TIME, + type=Args.to_datetime) + self.add_argument('-e', '-end', '--interval-end-time', default=C.INTERVAL_END_TIME, + type=Args.to_datetime) + self.add_argument('-n', '-node', '--node', default=C.DEFAULT_NODE_NUMBER) + self.add_argument('-t', '-batch', '--n_batch-size', default=C.DOWNLOAD_BATCH_SIZE) + self.add_argument('-w', '-worker', '--concurrent_type', default=C.CONCURRENT_TYPE) + self.add_argument('-c', '-concurrency', '--n_concurrent', default=C.CONCURRENT_DOWNLOADS) + + @staticmethod + def to_datetime(s: str): + return datetime.fromisoformat(s) + + def process_arguments(self): + if isinstance(self.interval_start_time, str): self.interval_start_time = datetime.fromisoformat(self.interval_start_time) + if isinstance(self.interval_end_time, str): self.interval_end_time = datetime.fromisoformat(self.interval_end_time) + + @staticmethod + def parse(): + return Args().parse_args() + + def __str__(self): + return f"""Command arguments: + action: {self.action} + root: {self.local_storage_rootdir} + bloblist: {self.bloblist_filename} + hosts: {self.hostnamemap_filename} + start: {self.interval_start_time} + end: {self.interval_end_time} + node: {self.node} + batch size: {self.n_batch_size} + worker type:{self.concurrent_type} + #workers: {self.n_concurrent} +""" + + +a: Optional[Args] = None diff --git a/hedera-node/test-clients/scripts/diff-testing/configuration.py b/hedera-node/test-clients/scripts/diff-testing/configuration.py new file mode 100644 index 000000000000..b31badb5eeff --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/configuration.py @@ -0,0 +1,50 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class Configuration: + # paths and filenames + LOCAL_STORAGE_ROOT: str = '/users/user/new-state/' # where the states and streams will be stored + MAINNET_HOSTNAMES_FILENAME: str = 'mainnet-hostnames-2024-02-04.txt' # file containing node# <-> hostname map + MAINNET_BLOBLIST_FILENAME: str = 'bloblist.txt' # file for ongoing state (blobs to fetch, blobs fetched, etc.) + + # interval to pull - start and end must _not_ cross 00:00Z (i.e., must be on same GMT day, except can have 00:00Z + # at either end) + INTERVAL_START_TIME: datetime = datetime.fromisoformat('2024-02-01T00:00:00') + INTERVAL_END_TIME: datetime = datetime.fromisoformat('2024-02-01T03:00:00') # '2024-02-02T00:00:00') + + # nodes + N_NODES: int = 29 + DEFAULT_NODE_NUMBER: int = 5 + + # gcp buckets + GCP_PROJECT: str = 'mainnet-exports' + NETWORK: str = 'mainnet' + GCP_EXPORT_BUCKET: str = 'hedera-mainnet-streams' + + # gcp download parameters + USE_RAW_DOWNLOADS: bool = False + DOWNLOAD_BATCH_SIZE: int = 100 + CONCURRENT_TYPE: str = 'thread' + CONCURRENT_DOWNLOADS: int = 5 + + # gcp networking parameters + GCP_SERVER_TIMEOUT: float = 180.0 + GCP_RETRY_INITIAL_WAIT_TIME: float = 1.5 + GCP_RETRY_WAIT_TIME_MULTIPLIER: float = 1.5 + GCP_RETRY_MAXIMUM_WAIT_TIME: float = 100.0 diff --git a/hedera-node/test-clients/scripts/diff-testing/gcp_access.py b/hedera-node/test-clients/scripts/diff-testing/gcp_access.py new file mode 100644 index 000000000000..28ef6370cbb9 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/gcp_access.py @@ -0,0 +1,170 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import warnings + +from google.cloud import storage +from google.cloud.storage import transfer_manager +from google.cloud.storage.retry import DEFAULT_RETRY + +from configuration import Configuration as C +from globals import g +from utils import split_list + + +def init_gcp_and_get_streams_bucket(): + """Initialize gcp library and return the interesting gcp bucket""" + g.gcp_retry_instance = DEFAULT_RETRY.with_delay(initial=C.GCP_RETRY_INITIAL_WAIT_TIME, + multiplier=C.GCP_RETRY_WAIT_TIME_MULTIPLIER, + maximum=C.GCP_RETRY_MAXIMUM_WAIT_TIME) + + print(f"Getting gcp bucket {g.gcp_export_bucket}") + + gcp_storage = storage.Client(project=g.gcp_project) + bucket = gcp_storage.bucket(g.gcp_export_bucket) # this returns a Bucket you can access blobs with + return gcp_storage, bucket + + +# Fetch file list + +def get_blobs_in_bucket(bucket, blob_glob, **kwargs): + """Given a blob glob - and optional lower and high blob names (w.r.t. timestamp) return all gcp Blobs that match""" + unambiguous_prefix = blob_glob.split('*')[0] + first_and_last_names = kwargs.get('first_and_last_names') + if first_and_last_names is None: + blobs = bucket.list_blobs(prefix=unambiguous_prefix, delimiter='/', match_glob=blob_glob, + timeout=C.GCP_SERVER_TIMEOUT, retry=g.gcp_retry_instance, + max_results=100000) + else: + blobs = bucket.list_blobs(prefix=unambiguous_prefix, delimiter='/', match_glob=blob_glob, + start_offset=first_and_last_names[0], end_offset=first_and_last_names[1], + timeout=C.GCP_SERVER_TIMEOUT, retry=g.gcp_retry_instance, + max_results=100000) + return [b for b in blobs] + + +# Fetch file content + +def download_blobs(bucket, blobs_to_get, actually_present_blobs, a): + blob_list = sorted(blobs_to_get, key=lambda b: b.name) + # get batches + print(f"download_blobs: a = {a}") + blob_batches = [bb for bb in split_list(a.n_batch_size, blob_list)] + + n_attempted = 0 + n_succeeded = 0 + n_failed = 0 + n_skipped_because_present = len(actually_present_blobs) + + n_batch = 0 + for batch in blob_batches: + print(f"Download batch {n_batch} of {len(blob_batches)} ({a.n_batch_size} at a time)") + + # Need to munge the _destination_ names so they do _not_ include the node number + blob_file_pairs = [(bucket.blob(b.name), + os.path.join(a.local_storage_rootdir, strip_blob_node_number(b.name))) + for b in batch] + for blob_file_pair in blob_file_pairs: + directory, _ = os.path.split(blob_file_pair[1]) + os.makedirs(directory, exist_ok=True) + + batch_results = ( + transfer_manager.download_many(blob_file_pairs, + worker_type=a.concurrent_type, + max_workers=a.n_concurrent, + skip_if_exists=True, + download_kwargs={'raw_download': C.USE_RAW_DOWNLOADS})) + + n_attempted += len(batch) + for blob, result in zip(batch, batch_results): + if isinstance(result, Exception): + n_failed += 1 + print(f"*** failed to download {blob.name} due to {result}") + blob_path = f"{a.local_storage_rootdir}/{blob.name}" + if os.path.exists(blob_path): + os.remove(blob_path) # gcp leaves partial files behind + else: # success + n_succeeded += 1 + blob_index = blob_list.index(blob) # it's gotta be in there + blob_list[blob_index].have = True + n_batch += 1 + + return (blob_list, + {'n_attempted': n_attempted, 'n_succeeded': n_succeeded, 'n_failed': n_failed, + 'n_skipped_because_present': n_skipped_because_present, 'n_batches': n_batch}) + + +# Making glob names and blobs from patterns, stream kind, timestamp prefix, interval start and end times, as needed + +def get_blob_glob(stream, node_number, timestamp_prefix): + """Return the blob glob (path in bucket) given the stream kind, the node number, and the timestamp _prefix_ + + This glob, passed to gcp, will return _all_ files of that stream and node and with that timestamp prefix. This + will probably be more files than you want as the timestamp prefix is more general than the interval specified. + """ + bodged_node_number = node_number + 3 # don't know why but node#s are bumped by 3 in filenames in gcp + pattern = g.stream_patterns[stream] + print(f"get_blob_format - pattern: {pattern}") + filled_pattern = pattern.format(bodged_node_number, timestamp_prefix) + print(f" - expanded: {filled_pattern}") + return filled_pattern + + +def get_interval_names_from_interval_and_pattern(stream, node_number): + """Return the blob path (in bucket) of the earliest and latest stream file from given node in the interval""" + bodged_node_number = node_number + 3 + pattern = g.stream_patterns[stream] + print(f"get_interval_names_from_interval - pattern: {pattern}") + first_name = pattern.format(bodged_node_number, g.interval_start_time_with_T + ' ') + last_name = pattern.format(bodged_node_number, g.interval_end_time_with_T + '~') + print(f" - first_name: {first_name}") + print(f" - last_name: {last_name}") + return first_name, last_name + + +# Reformat the blob filename for various reasons - but for one thing, we frequently want the blob filename _without_ +# the (gcp supplied) "file generation number" + +def split_blob_id(id): + with_generation_stripped = id.rpartition('/')[0] + timestamp_extracted = re.search('\\d{4}-\\d{2}-\\d{2}T\\d{2}_\\d{2}_\\d{2}[.]\\d*', with_generation_stripped, + flags=0) + return with_generation_stripped, timestamp_extracted.group() + + +def format_blob_id(id): + """Format the blob filename as it's path followed by its timestamp""" + path, timestamp = split_blob_id(id) + return f"{path} (@{timestamp})" + + +def get_blob_filename(id): + """Return the blob filename with bucket and generation# stripped""" + with_generation_stripped, _ = split_blob_id(id) + with_bucket_stripped = with_generation_stripped.removeprefix(g.gcp_export_bucket + '/') + return with_bucket_stripped + + +def strip_blob_node_number(id): + return re.sub(g.match_node_number_re, '', id) + + +# This is to suppress the warning that the crc32c module can't load the native crc implementation. Unfortunately +# this message only works if the worker type is thread. If process, the subprocesses doing the downloads don't have +# this setting transmitted to them. +# * Also I don't know why the native crc32c implementation doesn't load on my laptop. It's not really time critical +# though, so IDC +warnings.filterwarnings('ignore', message="As the c extension couldn't be imported", category=RuntimeWarning) diff --git a/hedera-node/test-clients/scripts/diff-testing/globals.py b/hedera-node/test-clients/scripts/diff-testing/globals.py new file mode 100644 index 000000000000..ebe7f8fbb69d --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/globals.py @@ -0,0 +1,56 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from google.api_core.retry import Retry +from re import Pattern + + +class Streams(Enum): + EVENTS = 1 + RECORDS = 2 + SIDECARS = 3 + + +class Globals: + interval_start_time_with_T: str + interval_end_time_with_T: str + interval_start_time_as_filename_component: str + interval_end_time_as_filename_component: str + interval_common_prefix: str + n_nodes: int + gcp_project: str + network: str + gcp_export_bucket: str + stream_patterns: dict[Streams, str] + match_node_number_re: Pattern + mainnet_hostnames: dict[int, str] + gcp_retry_instance: Retry + + def __str__(self): + return f"""Derived globals: + interval_start_time_with_T: {self.interval_start_time_with_T} + interval_end_time_with_T: {self.interval_end_time_with_T} + interval_start_time_as_filename_component: {self.interval_start_time_as_filename_component} + interval_end_time_as_filename_component: {self.interval_end_time_as_filename_component} + interval_common_prefix: {self.interval_common_prefix} + n_nodes: {self.n_nodes} + gcp_project: {self.gcp_project} + network: {self.network} + gcp_export_bucket: {self.gcp_export_bucket} + stream_patterns: {self.stream_patterns} +""" + + +g: Globals = Globals() diff --git a/hedera-node/test-clients/scripts/diff-testing/main.py b/hedera-node/test-clients/scripts/diff-testing/main.py new file mode 100644 index 000000000000..35215a8a0d09 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/main.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Fetch record streams, event streams, and state files for Hedera mainnet """ + +import copy + +from bloblist import Blob, BlobList +from cli_arguments import Args, a +from gcp_access import * +from globals import Streams, g +from utils import * + + +# Configuration setup functions + +def set_derived_configuration(): + """Set configuration settings that are derived, by some formula, from other configuration settings""" + + g.interval_start_time_with_T = a.interval_start_time.isoformat('T', 'seconds') + g.interval_end_time_with_T = a.interval_end_time.isoformat('T', 'seconds') + + g.interval_start_time_as_filename_component = g.interval_start_time_with_T.replace(':', '_') + g.interval_end_time_as_filename_component = g.interval_end_time_with_T.replace(':', '_') + g.interval_common_prefix = os.path.commonprefix( + [g.interval_start_time_as_filename_component, g.interval_end_time_as_filename_component]) + + if not g.interval_common_prefix: + print(f"*** Possible problem: interval_comon_prefix has 0-length; will match all timestamps, no filtering") + + +def set_fixed_configuration(): + """Set configuration settings that are so fixed you can just put them in the code""" + g.n_nodes = C.N_NODES + + +def set_default_paths_and_patterns(): + """Set default paths and path patterns (path patterns: GCP patterns for Hedera stream files)""" + g.gcp_project = C.GCP_PROJECT + g.network = C.NETWORK + g.gcp_export_bucket = C.GCP_EXPORT_BUCKET + + # each pattern has two arguments in this order: ( node_number, timestamp_common_prefix ) + g.stream_patterns = {Streams.EVENTS: 'eventsStreams/events_0.0.{}/{}*.evts', + Streams.RECORDS: 'recordstreams/record0.0.{}/{}*.rcd.gz', + Streams.SIDECARS: 'recordstreams/record0.0.{}/sidecar/{}*.rcd.gz'} + + g.match_node_number_re = re.compile(r'_?0[.]0[.][0-9]{1,2}') + + +def read_mainnet_hostnames(): + """Read the map from node# to hostnames, from a file""" + g.mainnet_hostnames_path = f"{a.local_storage_rootdir}/{a.hostnamemap_filename}" + f = open(g.mainnet_hostnames_path, 'r') + lines = f.readlines() + lines = [line.removeprefix('node') for line in lines] + lines = [line.split() for line in lines] + lines = [(int(line[0]), line[1]) for line in lines] + g.mainnet_hostnames = dict(lines) + + +def get_ready_to_go(): + """Get all configuration settings, or provide defaults""" + set_derived_configuration() + set_fixed_configuration() + set_default_paths_and_patterns() + read_mainnet_hostnames() + + +# node# <=> hostname mapping functions + +def get_name_of_node(node_number): + return g.mainnet_hostnames[node_number] + + +def get_number_of_node(node_name): + """Given a host's name, return the number of its node. + + In this script - throughout, not just here - a node is always identified by its number (0..29/30), + whether it is called a number or an id or whatever ...""" + return next(k for k, v in g.mainnet_hostnames.items() if v == node_name) + + +# Handle lists of available blobs in gcp + +def get_blobs_for_stream(bucket, stream): + """Return names of all blobs (as gcp Blobs) in the bucket for the given stream that are in the desired interval""" + blob_glob = get_blob_glob(stream, a.node, g.interval_common_prefix) + first_and_last_names = get_interval_names_from_interval_and_pattern(stream, a.node) + + print(f"getting blobs for {stream} with glob {blob_glob}") + all_prefixed_blobs = get_blobs_in_bucket(bucket, blob_glob, first_and_last_names=first_and_last_names) + print(f"found {len(all_prefixed_blobs)} matching blobs") + print(f" first: {format_blob_id(all_prefixed_blobs[0].id)}") + print(f" last: {format_blob_id(all_prefixed_blobs[-1].id)}") + + return all_prefixed_blobs + + +def map_gcp_blob_to_blob(stream_name, blobs): + """"Return list of (local) Blobs from list of gcp Blobs""" + return [Blob(type=stream_name, name=get_blob_filename(b.id), have=False, node=a.node, size=b.size, + crc32c=b.crc32c, md5hash=b.md5_hash) for b in blobs] + + +def get_blob_names_in_interval(bucket): + """Return, from gcp, all the names of all the stream files (blobs) in the time interval specified""" + print(f"do_get_blobs_json:") + print(f"interval_start_time: {g.interval_start_time_with_T}") + print(f"interval_end_time: {g.interval_end_time_with_T}") + + event_blobs = map_gcp_blob_to_blob("event", get_blobs_for_stream(bucket, Streams.EVENTS)) + record_blobs = map_gcp_blob_to_blob("record", get_blobs_for_stream(bucket, Streams.RECORDS)) + sidecar_blobs = map_gcp_blob_to_blob("sidecar", get_blobs_for_stream(bucket, Streams.SIDECARS)) + + all_blobs = event_blobs + record_blobs + sidecar_blobs + + print(f"Have {len(all_blobs)} blobs ({len(event_blobs)} events, " + + f"{len(record_blobs)} records, {len(sidecar_blobs)} sidecars)") + + return all_blobs + + +def get_blob_names(bucket): + all_blobs = get_blob_names_in_interval(bucket) + blob_list = BlobList(interval_start_time=a.interval_start_time, + interval_end_time=a.interval_end_time, + nodes={a.node}, + blobs=all_blobs) + print() + return blob_list + + +def load_bloblist(blob_list_path): + blob_list = BlobList.load(blob_list_path) + print(f"loaded {len(blob_list.blobs)} blobs " + + f"from {blob_list.interval_start_time} to {blob_list.interval_end_time}, " + + f"nodes: {set_to_csv(blob_list.nodes)}") + return blob_list + + +def merge_blob_lists(base_blob_list, new_blob_list): + """Merge new_blob_list into base_blob_list""" + print(f"merge_blob_lists: base has {len(base_blob_list)} blobs from nodes {set_to_csv(base_blob_list.nodes)}, " + + f"new has {len(new_blob_list)} from node {set_to_csv(new_blob_list.nodes)}") + + base_blob_names = {strip_blob_node_number(b.name) for b in base_blob_list.blobs} + + merged = copy.deepcopy(base_blob_list) + + n_merged = 0 + for b in new_blob_list.blobs: + if not strip_blob_node_number(b.name) in base_blob_names: + n_merged += 1 + merged.blobs.append(copy.copy(b)) + + print(f"merged {n_merged} new blobs") + return merged + + +def get_blob_path(filename): + return os.path.join(a.local_storage_rootdir, strip_blob_node_number(filename)) + + +def main(): + global a + a = Args.parse() + print(a) + + get_ready_to_go() + + print(g) + + gcp_storage, bucket = init_gcp_and_get_streams_bucket() + + blob_list_path = f"{a.local_storage_rootdir}/{a.bloblist_filename}" + + match a.action: + case 'get_blob_names': + blob_list = get_blob_names(bucket) + BlobList.save(blob_list_path, blob_list) + + case 'load_bloblist': + blob_list = load_bloblist(blob_list_path) + print(f"as object: \n", blob_list) + + case 'reget_blob_names': + # first load bloblist + original_blob_list = load_bloblist(blob_list_path) + + # then get blob names for given node + new_blob_list = get_blob_names(bucket) + + # then _merge_ by adding newly fetched names that were missing in the loaded bloblist + merged_blob_list = merge_blob_lists(original_blob_list, new_blob_list) + + # then write the bloblist again (keeping old version by renaming it) + BlobList.save(blob_list_path, merged_blob_list) + + case 'download_blobs': + # get the list of blobs we want + blob_list = load_bloblist(blob_list_path) + print(f"blob list has {len(blob_list.blobs)} blobs") + + # get missing blobs + dont_have_blobs = {b for b in blob_list.blobs if not b.have} + print(f"{len(dont_have_blobs)} missing blobs, of {len(blob_list.blobs)}") + + # see if any of those blobs actually are present on disk + actually_present_blobs = {b for b in dont_have_blobs if os.path.isfile(get_blob_path(b.name))} + # check the actually present files were fully downloaded (ideally we'd check the CRC/MD5 instead) + mismatched_size_blobs = {b for b in actually_present_blobs + if os.path.getsize(get_blob_path(b.name) != b.size)} + if len(actually_present_blobs) > 0: + newline_and_indent = '\n ' + print(f"{len(actually_present_blobs)} are marked 'have=False' yet exist on disk:" + + f"{newline_and_indent.join([b.name for b in actually_present_blobs])}") + if len(mismatched_size_blobs) > 0: + print(f"{len(mismatched_size_blobs)}, marked 'have=False', exist on disk but have the wrong size") + # If mismatched remove the file and try again (maybe script crashed previously, or fell to ^C) + for blob in mismatched_size_blobs: + os.remove(get_blob_path(blob.name)) + + print(f"batch size: {a.n_batch_size}") + result_blob_list, counters = download_blobs(bucket, dont_have_blobs, actually_present_blobs, a) + + BlobList.save(blob_list_path, blob_list) + + print(f"""Downloaded {counters['n_batches']} batches: + {counters['n_attempted']} blobs attempted + {counters['n_skipped_because_present']} blobs skipped because they were already present + {counters['n_succeeded']} blobs download succeeded + {counters['n_failed']} blobs failed to download""") + + +if __name__ == "__main__": + main() diff --git a/hedera-node/test-clients/scripts/diff-testing/requirements.txt b/hedera-node/test-clients/scripts/diff-testing/requirements.txt new file mode 100644 index 000000000000..89bbbc6aa982 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/requirements.txt @@ -0,0 +1,45 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cachetools==5.3.2 +certifi==2024.2.2 +cffi==1.16.0 +charset-normalizer==3.3.2 +cryptography==42.0.2 +Deprecated==1.2.14 +docstring-parser==0.15 +google-api-core==2.16.2 +google-auth==2.27.0 +google-cloud-core==2.4.1 +google-cloud-storage==2.14.0 +google-crc32c==1.5.0 +google-resumable-media==2.7.0 +googleapis-common-protos==1.62.0 +idna==3.6 +jsons==1.6.3 +mypy-extensions==1.0.0 +protobuf==4.25.2 +pyasn1==0.5.1 +pyasn1-modules==0.3.0 +pycparser==2.21 +PyJWT==2.8.0 +PyNaCl==1.5.0 +requests==2.31.0 +rsa==4.9 +typed-argument-parser==1.9.0 +typing-inspect==0.9.0 +typing_extensions==4.9.0 +typish==1.9.3 +urllib3==2.2.0 +wrapt==1.16.0 diff --git a/hedera-node/test-clients/scripts/diff-testing/utils.py b/hedera-node/test-clients/scripts/diff-testing/utils.py new file mode 100644 index 000000000000..5cc90a75de65 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/utils.py @@ -0,0 +1,31 @@ +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def naiveify(dt): + """ + Takes a datetime object and strips its timezone (works best if it is a UTC datetime, but doesn't check that) + :param dt: a datetime with a timezone (preferably UTC) + :return: same datetime but "naive" (without timezone) + """ + return dt.replace(tzinfo=None) + + +def set_to_csv(s): + return ', '.join( [str(e) for e in sorted(s)]) + + +def split_list(batch_size, a): + """Split a list into batches of the given size (last batch may be short)""" + for i in range(0, len(a), batch_size): + yield a[i:i+batch_size] diff --git a/hedera-node/test-clients/scripts/diff-testing/versioned_output_file.py b/hedera-node/test-clients/scripts/diff-testing/versioned_output_file.py new file mode 100644 index 000000000000..fc1efb2bbf69 --- /dev/null +++ b/hedera-node/test-clients/scripts/diff-testing/versioned_output_file.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python + +# Copyright (C) 2024 Hedera Hashgraph, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module provides a versioned output file. + +When you write to such a file, it saves a versioned backup of any +existing file contents. + +For usage examples see main(). + +From https://code.activestate.com/recipes/52277-saving-backups-when-writing-files/, +by Mitch Chapman, Python Software Foundation (PSF) license; a couple of bugs fixed, +marshalling code removed, and context manager support added. +""" + +import sys, os, glob + + +class VersionedOutputFile: + """This is like a file object opened for output, but it makes + versioned backups of anything it might otherwise overwrite. + Opens files for 'w' only (at this time).""" + + def __init__(self, pathname, numSavedVersions=3): + """Create a new output file. + + `pathname' is the name of the file to [over]write. + `numSavedVersions' tells how many of the most recent versions + of `pathname' to save.""" + + self._pathname = pathname + self._tmpPathname = "%s.~new~" % self._pathname + self._numSavedVersions = numSavedVersions + self._outf = open(self._tmpPathname, "w", encoding="utf-8") + + def __enter__(self): + return self; + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def __del__(self): + self.close() + + def close(self): + if self._outf: + self._outf.close() + self._replaceCurrentFile() + self._outf = None + + def asFile(self): + """Return self's shadowed file object, since marshal is + pretty insistent on working w. pure file objects.""" + return self._outf + + def __getattr__(self, attr): + """Delegate most operations to self's open file object.""" + return getattr(self.__dict__['_outf'], attr) + + def _replaceCurrentFile(self): + """Replace the current contents of self's named file.""" + self._backupCurrentFile() + os.rename(self._tmpPathname, self._pathname) + + def _backupCurrentFile(self): + """Save a numbered backup of self's named file.""" + # If the file doesn't already exist, there's nothing to do. + if os.path.isfile(self._pathname): + newName = self._versionedName(self._currentRevision() + 1) + os.rename(self._pathname, newName) + + # Maybe get rid of old versions. + if ((self._numSavedVersions is not None) and + (self._numSavedVersions > 0)): + self._deleteOldRevisions() + + def _versionedName(self, revision): + """Get self's pathname with a revision number appended.""" + return "%s.~%s~" % (self._pathname, revision) + + def _currentRevision(self): + """Get the revision number of self's largest existing backup.""" + revisions = [0] + self._revisions() + return max(revisions) + + def _revisions(self): + """Get the revision numbers of all of self's backups.""" + + revisions = [] + backupNames = glob.glob("%s.~[0-9]*~" % (self._pathname)) + for name in backupNames: + try: + revision = int(name.split("~")[-2]) + revisions.append(revision) + except ValueError: + # Some ~[0-9]*~ extensions may not be wholly numeric. + pass + revisions.sort() + return revisions + + def _deleteOldRevisions(self): + """Delete old versions of self's file, so that at most + self._numSavedVersions versions are retained.""" + + revisions = self._revisions() + revisionsToDelete = revisions[:-self._numSavedVersions] + for revision in revisionsToDelete: + pathname = self._versionedName(revision) + if os.path.isfile(pathname): + os.remove(pathname) + + +def main(): + """Module mainline (for isolation testing)""" + basename = "TestFile.txt" + if os.path.exists(basename): + os.remove(basename) + for i in range(10): + outf = VersionedOutputFile(basename) + outf.write("This is version %s.\n" % i) + outf.close() + + # Now there should be just four versions of TestFile.txt: + expectedSuffixes = ["", ".~7~", ".~8~", ".~9~"] + expectedVersions = [] + for suffix in expectedSuffixes: + expectedVersions.append("%s%s" % (basename, suffix)) + matchingFiles = glob.glob("%s*" % basename) + for filename in matchingFiles: + if filename not in expectedVersions: + sys.stderr.write("Found unexpected file %s.\n" % filename) + else: + # Unit tests should clean up after themselves... + os.remove(filename) + + +if __name__ == "__main__": + main() diff --git a/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BreakableDataSource.java b/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BreakableDataSource.java index 185cd2f0aa2f..069bfe49ad5d 100644 --- a/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BreakableDataSource.java +++ b/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BreakableDataSource.java @@ -52,6 +52,8 @@ public void saveRecords( final List> leaves = leafRecordsToAddOrUpdate.toList(); if (builder.numTimesBroken < builder.numTimesToBreak) { + // Syncronization block is not required here, as this code is never called in parallel + // (though from different threads). `volatile` modifier is sufficient to ensure visibility. builder.numCalls += leaves.size(); if (builder.numCalls > builder.numCallsBeforeThrow) { builder.numCalls = 0; diff --git a/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BrokenBuilder.java b/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BrokenBuilder.java index be30c90281cd..aeb31f97b513 100644 --- a/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BrokenBuilder.java +++ b/platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/BrokenBuilder.java @@ -31,10 +31,13 @@ public final class BrokenBuilder implements VirtualDataSourceBuilder delegate; - int numCallsBeforeThrow = Integer.MAX_VALUE; - int numCalls = 0; - int numTimesToBreak = 0; - int numTimesBroken = 0; + volatile int numCallsBeforeThrow = Integer.MAX_VALUE; + volatile int numTimesToBreak = 0; + + // the following fields are volatile to ensure visibility, + // as BreakableDataSource.saveRecords called from multiple threads. + volatile int numCalls = 0; + volatile int numTimesBroken = 0; public BrokenBuilder() {} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java index 51f1ce30792e..8039dadb9090 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java @@ -156,12 +156,8 @@ import com.swirlds.platform.system.address.Address; import com.swirlds.platform.system.address.AddressBook; import com.swirlds.platform.system.address.AddressBookUtils; -import com.swirlds.platform.system.state.notifications.IssListener; -import com.swirlds.platform.system.state.notifications.IssNotification; -import com.swirlds.platform.system.state.notifications.IssNotification.IssType; import com.swirlds.platform.system.status.PlatformStatus; import com.swirlds.platform.system.status.PlatformStatusManager; -import com.swirlds.platform.system.status.actions.CatastrophicFailureAction; import com.swirlds.platform.system.status.actions.DoneReplayingEventsAction; import com.swirlds.platform.system.status.actions.ReconnectCompleteAction; import com.swirlds.platform.system.status.actions.StartedReplayingEventsAction; @@ -170,6 +166,8 @@ import com.swirlds.platform.util.PlatformComponents; import com.swirlds.platform.wiring.NoInput; import com.swirlds.platform.wiring.PlatformWiring; +import com.swirlds.platform.wiring.components.IssDetectorWiring; +import com.swirlds.platform.wiring.components.StateAndRound; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; @@ -177,11 +175,9 @@ import java.nio.file.Path; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; import org.apache.logging.log4j.LogManager; @@ -291,7 +287,7 @@ public class SwirldsPlatform implements Platform { */ private final PlatformWiring platformWiring; /** thread-queue responsible for hashing states */ - private final QueueThread stateHashSignQueue; + private final QueueThread stateHashSignQueue; private final AncientMode ancientMode; @@ -522,11 +518,6 @@ public class SwirldsPlatform implements Platform { components.add(stateManagementComponent); - final BiConsumer roundAndStateConsumer = (state, round) -> { - platformWiring.getIssDetectorWiring().handleConsensusRound().put(round); - platformWiring.getSignatureCollectorConsensusInput().put(round); - }; - // FUTURE WORK remove this when there are no more ShutdownRequestedTriggers being dispatched components.add(new Shutdown()); @@ -561,23 +552,40 @@ public class SwirldsPlatform implements Platform { platformContext, currentAddressBook, selfId, - roundAndStateConsumer, new SwirldStateMetrics(platformContext.getMetrics()), platformStatusManager, initialState.getState(), appVersion); - final InterruptableConsumer newSignedStateFromTransactionsConsumer = rs -> { - latestImmutableState.setState(rs.getAndReserve("newSignedStateFromTransactionsConsumer")); - latestCompleteState.newIncompleteState(rs.get().getRound()); - savedStateController.markSavedState(rs.getAndReserve("savedStateController.markSavedState")); + // FUTURE WORK: the lambda is an intermediate step toward passing the state and round over wires + // This is the handler method for the stateHashSignQueue, which the ConsensusRoundHandler pushes data onto. + final InterruptableConsumer newSignedStateFromTransactionsConsumer = stateAndRound -> { + final ReservedSignedState state = stateAndRound.reservedSignedState(); + final long roundNumber = state.get().getRound(); + final ConsensusRound consensusRound = stateAndRound.round(); + + latestImmutableState.setState(state.getAndReserve("newSignedStateFromTransactionsConsumer")); + latestCompleteState.newIncompleteState(roundNumber); + savedStateController.markSavedState(state.getAndReserve("savedStateController.markSavedState")); + + // FUTURE WORK: this is where the state is currently being hashed. State hashing will be moved into a + // separate component. At that time, all subsequent method calls in this lambda will be wired to receive + // data from the hasher, since they require a strong guarantee that the state has been hashed. stateManagementComponent.newSignedStateFromTransactions( - rs.getAndReserve("stateManagementComponent.newSignedStateFromTransactions")); - platformWiring.getIssDetectorWiring().newStateHashed().put(rs.getAndReserve("issDetector")); - rs.close(); + state.getAndReserve("stateManagementComponent.newSignedStateFromTransactions")); + + final IssDetectorWiring issDetectorWiring = platformWiring.getIssDetectorWiring(); + // FUTURE WORK: these three method calls will be combined into a single method call + issDetectorWiring.roundCompletedInput().put(roundNumber); + issDetectorWiring.newStateHashed().put(state.getAndReserve("issDetector")); + issDetectorWiring.handleConsensusRound().put(consensusRound); + + platformWiring.getSignatureCollectorConsensusInput().put(consensusRound); + + stateAndRound.reservedSignedState().close(); }; - stateHashSignQueue = components.add(new QueueThreadConfiguration(threadManager) + stateHashSignQueue = components.add(new QueueThreadConfiguration(threadManager) .setNodeId(selfId) .setComponent(PLATFORM_THREAD_POOL_NAME) .setThreadName("state_hash_sign") @@ -592,7 +600,6 @@ public class SwirldsPlatform implements Platform { stateHashSignQueue, eventDurabilityNexus::waitUntilDurable, platformStatusManager, - platformWiring.getIssDetectorWiring().roundCompletedInput()::put, appVersion); final PcesSequencer sequencer = new PcesSequencer(); @@ -635,23 +642,13 @@ public class SwirldsPlatform implements Platform { platformStatusManager::getCurrentStatus, latestReconnectRound::get); - platformWiring.wireExternalComponents(platformStatusManager, transactionPool, latestCompleteState); + platformWiring.wireExternalComponents( + platformStatusManager, transactionPool, latestCompleteState, notificationEngine); final FutureEventBuffer futureEventBuffer = new FutureEventBuffer(platformContext); - // wire ISS output final IssHandler issHandler = new IssHandler(stateConfig, this::haltRequested, this::handleFatalError, issScratchpad); - final OutputWire issOutput = - platformWiring.getIssDetectorWiring().issNotificationOutput(); - issOutput.solderTo( - "issNotificationEngine", "ISS notification", n -> notificationEngine.dispatch(IssListener.class, n)); - issOutput.solderTo("statusManager_submitCatastrophicFailure", "ISS notification", n -> { - if (Set.of(IssType.SELF_ISS, IssType.CATASTROPHIC_ISS).contains(n.getIssType())) { - platformStatusManager.submitStatusAction(new CatastrophicFailureAction()); - } - }); - issOutput.solderTo("issHandler", "ISS notification", issHandler::issObserved); final OutputWire stateSavingResultOutput = platformWiring.getStateSavingResultOutput(); stateSavingResultOutput.solderTo( @@ -689,6 +686,7 @@ public class SwirldsPlatform implements Platform { eventStreamManager, futureEventBuffer, issDetector, + issHandler, hashLogger, latestCompleteStateNotifier); @@ -787,10 +785,10 @@ public class SwirldsPlatform implements Platform { } final Clearable clearStateHashSignQueue = () -> { - ReservedSignedState signedState = stateHashSignQueue.poll(); - while (signedState != null) { - signedState.close(); - signedState = stateHashSignQueue.poll(); + StateAndRound stateAndRound = stateHashSignQueue.poll(); + while (stateAndRound != null) { + stateAndRound.reservedSignedState().close(); + stateAndRound = stateHashSignQueue.poll(); } }; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/DiagramCommand.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/DiagramCommand.java index 395ace443d30..fcb6b5f123e1 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/DiagramCommand.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/cli/DiagramCommand.java @@ -16,6 +16,8 @@ package com.swirlds.platform.cli; +import static com.swirlds.common.threading.manager.AdHocThreadManager.getStaticThreadManager; + import com.swirlds.base.time.Time; import com.swirlds.cli.PlatformCli; import com.swirlds.cli.utility.AbstractCommand; @@ -24,11 +26,17 @@ import com.swirlds.common.context.PlatformContext; import com.swirlds.common.crypto.CryptographyHolder; import com.swirlds.common.metrics.noop.NoOpMetrics; +import com.swirlds.common.notification.NotificationEngine; +import com.swirlds.common.threading.manager.ThreadManager; import com.swirlds.common.wiring.model.ModelEdgeSubstitution; import com.swirlds.common.wiring.model.ModelGroup; import com.swirlds.common.wiring.model.ModelManualLink; import com.swirlds.config.api.Configuration; import com.swirlds.platform.config.DefaultConfiguration; +import com.swirlds.platform.config.StateConfig; +import com.swirlds.platform.eventhandling.TransactionPool; +import com.swirlds.platform.state.nexus.LatestCompleteStateNexus; +import com.swirlds.platform.system.status.PlatformStatusManager; import com.swirlds.platform.wiring.PlatformWiring; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; @@ -93,6 +101,16 @@ public Integer call() throws IOException { final PlatformWiring platformWiring = new PlatformWiring(platformContext, Time.getCurrent()); + final ThreadManager threadManager = getStaticThreadManager(); + final NotificationEngine notificationEngine = NotificationEngine.buildEngine(threadManager); + platformWiring.wireExternalComponents( + new PlatformStatusManager(platformContext, platformContext.getTime(), threadManager, a -> {}), + new TransactionPool(platformContext), + new LatestCompleteStateNexus( + platformContext.getConfiguration().getConfigData(StateConfig.class), + platformContext.getMetrics()), + notificationEngine); + final String diagramString = platformWiring .getModel() .generateWiringDiagram(parseGroups(), parseSubstitutions(), parseManualLinks()); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandler.java index a9ec2c52e8c0..c0101abafd9d 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandler.java @@ -21,7 +21,6 @@ import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.GETTING_STATE_TO_SIGN; import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.HANDLING_CONSENSUS_ROUND; import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.IDLE; -import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.MARKING_ROUND_COMPLETE; import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.SETTING_EVENT_CONSENSUS_DATA; import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.UPDATING_PLATFORM_STATE; import static com.swirlds.platform.eventhandling.ConsensusRoundHandlerPhase.UPDATING_PLATFORM_STATE_RUNNING_HASH; @@ -45,15 +44,14 @@ import com.swirlds.platform.state.PlatformState; import com.swirlds.platform.state.State; import com.swirlds.platform.state.SwirldStateManager; -import com.swirlds.platform.state.signed.ReservedSignedState; import com.swirlds.platform.state.signed.SignedState; import com.swirlds.platform.system.SoftwareVersion; import com.swirlds.platform.system.status.StatusActionSubmitter; import com.swirlds.platform.system.status.actions.FreezePeriodEnteredAction; +import com.swirlds.platform.wiring.components.StateAndRound; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Objects; import java.util.concurrent.BlockingQueue; -import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -92,9 +90,9 @@ public class ConsensusRoundHandler { new RunningHash(new ImmutableHash(new byte[DigestType.SHA_384.digestLength()])); /** - * A queue that accepts signed states for hashing and signature collection. + * A queue that accepts signed states and rounds for hashing and signature collection. */ - private final BlockingQueue stateHashSignQueue; + private final BlockingQueue stateHashSignQueue; /** * Enables submitting platform status actions. @@ -103,8 +101,6 @@ public class ConsensusRoundHandler { private final SoftwareVersion softwareVersion; - private final Consumer roundAppliedToStateConsumer; - /** * A method that blocks until an event becomes durable. */ @@ -125,22 +121,20 @@ public class ConsensusRoundHandler { /** * Constructor * - * @param platformContext contains various platform utilities - * @param swirldStateManager the swirld state manager to send events to - * @param stateHashSignQueue the queue thread that handles hashing and collecting signatures of new - * self-signed states - * @param waitForEventDurability a method that blocks until an event becomes durable - * @param statusActionSubmitter enables submitting of platform status actions - * @param roundAppliedToStateConsumer informs the consensus hash manager that a round has been applied to state - * @param softwareVersion the current version of the software + * @param platformContext contains various platform utilities + * @param swirldStateManager the swirld state manager to send events to + * @param stateHashSignQueue the queue thread that handles hashing and collecting signatures of new + * self-signed states + * @param waitForEventDurability a method that blocks until an event becomes durable + * @param statusActionSubmitter enables submitting of platform status actions + * @param softwareVersion the current version of the software */ public ConsensusRoundHandler( @NonNull final PlatformContext platformContext, @NonNull final SwirldStateManager swirldStateManager, - @NonNull final BlockingQueue stateHashSignQueue, + @NonNull final BlockingQueue stateHashSignQueue, @NonNull final CheckedConsumer waitForEventDurability, @NonNull final StatusActionSubmitter statusActionSubmitter, - @NonNull final Consumer roundAppliedToStateConsumer, @NonNull final SoftwareVersion softwareVersion) { this.platformContext = Objects.requireNonNull(platformContext); @@ -148,7 +142,6 @@ public ConsensusRoundHandler( this.stateHashSignQueue = Objects.requireNonNull(stateHashSignQueue); this.waitForEventDurability = Objects.requireNonNull(waitForEventDurability); this.statusActionSubmitter = Objects.requireNonNull(statusActionSubmitter); - this.roundAppliedToStateConsumer = Objects.requireNonNull(roundAppliedToStateConsumer); this.softwareVersion = Objects.requireNonNull(softwareVersion); this.roundsNonAncient = platformContext @@ -225,14 +218,10 @@ public void handleConsensusRound(@NonNull final ConsensusRound consensusRound) { handlerMetrics.setPhase(HANDLING_CONSENSUS_ROUND); swirldStateManager.handleConsensusRound(consensusRound); - handlerMetrics.setPhase(MARKING_ROUND_COMPLETE); - // this calls into the ConsensusHashManager - roundAppliedToStateConsumer.accept(consensusRound.getRoundNum()); - handlerMetrics.setPhase(UPDATING_PLATFORM_STATE_RUNNING_HASH); updatePlatformStateRunningHash(consensusRound); - createSignedState(); + createSignedState(consensusRound); } catch (final InterruptedException e) { logger.error(EXCEPTION.getMarker(), "handleConsensusRound interrupted"); Thread.currentThread().interrupt(); @@ -284,9 +273,10 @@ private void updatePlatformStateRunningHash(@NonNull final ConsensusRound round) /** * Create a signed state * + * @param consensusRound the consensus round that resulted in the state being created * @throws InterruptedException if this thread is interrupted */ - private void createSignedState() throws InterruptedException { + private void createSignedState(@NonNull final ConsensusRound consensusRound) throws InterruptedException { if (freezeRoundReceived) { // Let the swirld state manager know we are about to write the saved state for the freeze period swirldStateManager.savedStateInFreezePeriod(); @@ -298,6 +288,8 @@ private void createSignedState() throws InterruptedException { handlerMetrics.setPhase(CREATING_SIGNED_STATE); final SignedState signedState = new SignedState( platformContext, immutableStateCons, "ConsensusRoundHandler.createSignedState()", freezeRoundReceived); - stateHashSignQueue.put(signedState.reserve("ConsensusRoundHandler.createSignedState()")); + + stateHashSignQueue.put( + new StateAndRound(signedState.reserve("ConsensusRoundHandler.createSignedState()"), consensusRound)); } } diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerPhase.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerPhase.java index 6532be696646..9eed727414c4 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerPhase.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerPhase.java @@ -49,10 +49,6 @@ public enum ConsensusRoundHandlerPhase { * The platform state is being updated with the running hash of the round. */ UPDATING_PLATFORM_STATE_RUNNING_HASH, - /** - * The consensus hash manager is being informed that the round has been handled. - */ - MARKING_ROUND_COMPLETE, /** * The handler is getting the state to sign. */ diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/SwirldStateManager.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/SwirldStateManager.java index 1875247c5f40..4ef86a798d65 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/SwirldStateManager.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/SwirldStateManager.java @@ -38,7 +38,6 @@ import java.time.Instant; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; /** * Manages all interactions with the state object required by {@link SwirldState}. @@ -70,11 +69,6 @@ public class SwirldStateManager implements FreezePeriodChecker, LoadableFromSign */ private final UptimeTracker uptimeTracker; - /** - * Handles system transactions post-consensus - */ - private final BiConsumer roundAndStateConsumer; - /** * The current software version. */ @@ -86,8 +80,6 @@ public class SwirldStateManager implements FreezePeriodChecker, LoadableFromSign * @param platformContext the platform context * @param addressBook the address book * @param selfId this node's id - * @param roundAndStateConsumer consumes a consensus round and the state that results from applying the consensus - * transactions * @param swirldStateMetrics metrics related to SwirldState * @param statusActionSubmitter enables submitting platform status actions * @param state the genesis state @@ -97,7 +89,6 @@ public SwirldStateManager( @NonNull final PlatformContext platformContext, @NonNull final AddressBook addressBook, @NonNull final NodeId selfId, - @NonNull final BiConsumer roundAndStateConsumer, @NonNull final SwirldStateMetrics swirldStateMetrics, @NonNull final StatusActionSubmitter statusActionSubmitter, @NonNull final State state, @@ -106,7 +97,6 @@ public SwirldStateManager( Objects.requireNonNull(platformContext); Objects.requireNonNull(addressBook); Objects.requireNonNull(selfId); - this.roundAndStateConsumer = Objects.requireNonNull(roundAndStateConsumer); this.stats = Objects.requireNonNull(swirldStateMetrics); Objects.requireNonNull(statusActionSubmitter); Objects.requireNonNull(state); @@ -167,7 +157,6 @@ public void handleConsensusRound(final ConsensusRound round) { state.getPlatformState().getUptimeData(), state.getPlatformState().getAddressBook()); transactionHandler.handleRound(round, state); - roundAndStateConsumer.accept(state, round); updateEpoch(); } diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulers.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulers.java index e1674b057b89..292d4c6992fe 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulers.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulers.java @@ -57,6 +57,7 @@ * @param runningHashUpdateScheduler the scheduler for the running hash updater * @param futureEventBufferScheduler the scheduler for the future event buffer * @param issDetectorScheduler the scheduler for the iss detector + * @param issHandlerScheduler the scheduler for the iss handler * @param hashLoggerScheduler the scheduler for the hash logger * @param latestCompleteStateScheduler the scheduler for the latest complete state notifier */ @@ -84,6 +85,7 @@ public record PlatformSchedulers( @NonNull TaskScheduler runningHashUpdateScheduler, @NonNull TaskScheduler> futureEventBufferScheduler, @NonNull TaskScheduler> issDetectorScheduler, + @NonNull TaskScheduler issHandlerScheduler, @NonNull TaskScheduler hashLoggerScheduler, @NonNull TaskScheduler latestCompleteStateScheduler) { @@ -258,6 +260,10 @@ public static PlatformSchedulers create( .withMetricsBuilder(model.metricsBuilder().withUnhandledTaskMetricEnabled(true)) .build() .cast(), + model.schedulerBuilder("issHandler") + .withType(TaskSchedulerType.DIRECT) + .build() + .cast(), model.schedulerBuilder("hashLogger") .withType(config.hashLoggerSchedulerType()) .withUnhandledTaskCapacity(config.hashLoggerUnhandledTaskCapacity()) diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java index e6e291017105..81dbf3f1f03d 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java @@ -24,6 +24,7 @@ import com.swirlds.base.time.Time; import com.swirlds.common.context.PlatformContext; import com.swirlds.common.io.IOIterator; +import com.swirlds.common.notification.NotificationEngine; import com.swirlds.common.stream.EventStreamManager; import com.swirlds.common.stream.RunningEventHashUpdate; import com.swirlds.common.utility.Clearable; @@ -59,13 +60,17 @@ import com.swirlds.platform.internal.EventImpl; import com.swirlds.platform.state.SwirldStateManager; import com.swirlds.platform.state.iss.IssDetector; +import com.swirlds.platform.state.iss.IssHandler; import com.swirlds.platform.state.nexus.LatestCompleteStateNexus; import com.swirlds.platform.state.signed.ReservedSignedState; import com.swirlds.platform.state.signed.SignedStateFileManager; import com.swirlds.platform.state.signed.StateDumpRequest; import com.swirlds.platform.state.signed.StateSavingResult; import com.swirlds.platform.state.signed.StateSignatureCollector; +import com.swirlds.platform.system.state.notifications.IssListener; +import com.swirlds.platform.system.state.notifications.IssNotification; import com.swirlds.platform.system.status.PlatformStatusManager; +import com.swirlds.platform.system.status.actions.CatastrophicFailureAction; import com.swirlds.platform.util.HashLogger; import com.swirlds.platform.wiring.components.ApplicationTransactionPrehandlerWiring; import com.swirlds.platform.wiring.components.ConsensusRoundHandlerWiring; @@ -78,6 +83,7 @@ import com.swirlds.platform.wiring.components.GossipWiring; import com.swirlds.platform.wiring.components.HashLoggerWiring; import com.swirlds.platform.wiring.components.IssDetectorWiring; +import com.swirlds.platform.wiring.components.IssHandlerWiring; import com.swirlds.platform.wiring.components.LatestCompleteStateNotifierWiring; import com.swirlds.platform.wiring.components.PcesReplayerWiring; import com.swirlds.platform.wiring.components.PcesSequencerWiring; @@ -89,6 +95,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Duration; import java.util.List; +import java.util.Set; import java.util.concurrent.ForkJoinPool; import java.util.function.LongSupplier; import org.apache.logging.log4j.LogManager; @@ -128,6 +135,7 @@ public class PlatformWiring implements Startable, Stoppable, Clearable { private final EventStreamManagerWiring eventStreamManagerWiring; private final RunningHashUpdaterWiring runningHashUpdaterWiring; private final IssDetectorWiring issDetectorWiring; + private final IssHandlerWiring issHandlerWiring; private final HashLoggerWiring hashLoggerWiring; private final LatestCompleteStateNotifierWiring latestCompleteStateNotifierWiring; @@ -214,6 +222,7 @@ public PlatformWiring(@NonNull final PlatformContext platformContext, @NonNull f eventWindowManagerWiring = EventWindowManagerWiring.create(model); issDetectorWiring = IssDetectorWiring.create(model, schedulers.issDetectorScheduler()); + issHandlerWiring = IssHandlerWiring.create(schedulers.issHandlerScheduler()); hashLoggerWiring = HashLoggerWiring.create(schedulers.hashLoggerScheduler()); latestCompleteStateNotifierWiring = @@ -311,6 +320,8 @@ private void wire() { .solderTo(consensusRoundHandlerWiring.runningHashUpdateInput()); runningHashUpdaterWiring.runningHashUpdateOutput().solderTo(eventStreamManagerWiring.runningHashUpdateInput()); + issDetectorWiring.issNotificationOutput().solderTo(issHandlerWiring.issNotificationInput()); + stateSignatureCollectorWiring .getCompleteStatesOutput() .solderTo(latestCompleteStateNotifierWiring.completeStateNotificationInputWire()); @@ -322,13 +333,15 @@ private void wire() { * Future work: as more components are moved to the framework, this method should shrink, and eventually be * removed. * - * @param statusManager the status manager to wire - * @param transactionPool the transaction pool to wire + * @param statusManager the status manager to wire + * @param transactionPool the transaction pool to wire + * @param notificationEngine the notification engine to wire */ public void wireExternalComponents( @NonNull final PlatformStatusManager statusManager, @NonNull final TransactionPool transactionPool, - @NonNull final LatestCompleteStateNexus latestCompleteStateNexus) { + @NonNull final LatestCompleteStateNexus latestCompleteStateNexus, + @NonNull final NotificationEngine notificationEngine) { signedStateFileManagerWiring .stateWrittenToDiskOutputWire() @@ -344,6 +357,21 @@ public void wireExternalComponents( stateSignatureCollectorWiring .getCompleteStatesOutput() .solderTo("latestCompleteStateNexus", "complete state", latestCompleteStateNexus::setStateIfNewer); + + issDetectorWiring + .issNotificationOutput() + .solderTo( + "issNotificationEngine", + "ISS notification", + n -> notificationEngine.dispatch(IssListener.class, n)); + issDetectorWiring + .issNotificationOutput() + .solderTo("statusManager_submitCatastrophicFailure", "ISS notification", n -> { + if (Set.of(IssNotification.IssType.SELF_ISS, IssNotification.IssType.CATASTROPHIC_ISS) + .contains(n.getIssType())) { + statusManager.submitStatusAction(new CatastrophicFailureAction()); + } + }); } /** @@ -370,6 +398,7 @@ public void wireExternalComponents( * @param eventStreamManager the event stream manager to bind * @param futureEventBuffer the future event buffer to bind * @param issDetector the ISS detector to bind + * @param issHandler the ISS handler to bind * @param hashLogger the hash logger to bind * @param completeStateNotifier the latest complete state notifier to bind */ @@ -395,6 +424,7 @@ public void bind( @NonNull final EventStreamManager eventStreamManager, @NonNull final FutureEventBuffer futureEventBuffer, @NonNull final IssDetector issDetector, + @NonNull final IssHandler issHandler, @NonNull final HashLogger hashLogger, @NonNull final LatestCompleteStateNotifier completeStateNotifier) { @@ -419,6 +449,7 @@ public void bind( eventStreamManagerWiring.bind(eventStreamManager); futureEventBufferWiring.bind(futureEventBuffer); issDetectorWiring.bind(issDetector); + issHandlerWiring.bind(issHandler); hashLoggerWiring.bind(hashLogger); latestCompleteStateNotifierWiring.bind(completeStateNotifier); } diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssDetectorWiring.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssDetectorWiring.java index 33350ec7692a..363afc880fb5 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssDetectorWiring.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssDetectorWiring.java @@ -69,7 +69,7 @@ public static IssDetectorWiring create( "consensus round", new SystemTransactionExtractor<>(StateSignatureTransaction.class)::handleRound); final InputWire>> sigInput = - taskScheduler.buildInputWire("handlePostconsensusSignatures"); + taskScheduler.buildInputWire("post consensus signatures"); roundTransformer.getOutputWire().solderTo(sigInput); return new IssDetectorWiring( taskScheduler.buildInputWire("endOfPcesReplay"), @@ -78,7 +78,7 @@ public static IssDetectorWiring create( sigInput, taskScheduler.buildInputWire("newStateHashed"), taskScheduler.buildInputWire("overridingState"), - taskScheduler.getOutputWire().buildSplitter("issNotificationSplitter", "issNotificationList")); + taskScheduler.getOutputWire().buildSplitter("issNotificationSplitter", "iss notifications")); } /** diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssHandlerWiring.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssHandlerWiring.java new file mode 100644 index 000000000000..e12cdeaf3012 --- /dev/null +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/IssHandlerWiring.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.platform.wiring.components; + +import com.swirlds.common.wiring.schedulers.TaskScheduler; +import com.swirlds.common.wiring.wires.input.BindableInputWire; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.platform.state.iss.IssHandler; +import com.swirlds.platform.system.state.notifications.IssNotification; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Wiring for the {@link IssHandler} + * + * @param issNotificationInput the input wire for ISS notifications + */ +public record IssHandlerWiring(@NonNull InputWire issNotificationInput) { + /** + * Create a new instance of this wiring + * + * @param taskScheduler the task scheduler to use + * @return the new instance + */ + @NonNull + public static IssHandlerWiring create(@NonNull final TaskScheduler taskScheduler) { + return new IssHandlerWiring(taskScheduler.buildInputWire("iss notification")); + } + + /** + * Bind the input wire to the given handler + * + * @param issHandler the handler to bind to + */ + public void bind(@NonNull final IssHandler issHandler) { + ((BindableInputWire) issNotificationInput).bind(issHandler::issObserved); + } +} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/StateAndRound.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/StateAndRound.java new file mode 100644 index 000000000000..0917b026b9b1 --- /dev/null +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/components/StateAndRound.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.platform.wiring.components; + +import com.swirlds.platform.internal.ConsensusRound; +import com.swirlds.platform.state.signed.ReservedSignedState; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Contains a reserved signed state, and the consensus round which caused the state to be created + * + * @param reservedSignedState the state + * @param round the round that caused the state to be created + */ +public record StateAndRound(@NonNull ReservedSignedState reservedSignedState, @NonNull ConsensusRound round) {} diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh index 37890941dc83..c9405e9be924 100755 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/generate-platform-diagram.sh @@ -15,14 +15,14 @@ pcli diagram \ -g 'Event Hashing:eventHasher,postHashCollector' \ -g 'Orphan Buffer:orphanBuffer,orphanBufferSplitter' \ -g 'Consensus Engine:consensusEngine,consensusEngineSplitter,eventWindowManager,getKeystoneEventSequenceNumber' \ - -g 'State File Management:saveToDiskFilter,signedStateFileManager,extractOldestMinimumGenerationOnDisk,toStateWrittenToDiskAction' \ + -g 'State File Management:saveToDiskFilter,signedStateFileManager,extractOldestMinimumGenerationOnDisk,toStateWrittenToDiskAction,statusManager_submitStateWritten' \ -g 'State Signature Collection:stateSignatureCollector,reservedStateSplitter,allStatesReserver,completeStateFilter,completeStatesReserver,extractConsensusSignatureTransactions,extractPreconsensusSignatureTransactions' \ -g 'Preconsensus Event Stream:pcesSequencer,pcesWriter,eventDurabilityNexus,🕑' \ -g 'Consensus Event Stream:getEvents,eventStreamManager' \ -g 'Consensus Pipeline:inOrderLinker,Consensus Engine,📬,🌀,🚽' \ -g 'Event Creation:futureEventBuffer,futureEventBufferSplitter,eventCreationManager' \ -g 'Gossip:gossip,shadowgraph' \ - -g 'Iss Detector:extractSignaturesForIssDetector,issDetector,issNotificationSplitter' \ + -g 'Iss Detector:extractSignaturesForIssDetector,issDetector,issNotificationSplitter,issHandler,issNotificationEngine,statusManager_submitCatastrophicFailure' \ -g 'Heartbeat:heartbeat,❤️' \ -g 'PCES Replay:pcesReplayer,✅' \ -g 'Transaction Prehandling:applicationTransactionPrehandler,🔮' \ diff --git a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerTests.java b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerTests.java index b16ba0646645..0ea80634545c 100644 --- a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerTests.java +++ b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/eventhandling/ConsensusRoundHandlerTests.java @@ -16,7 +16,6 @@ package com.swirlds.platform.eventhandling; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -37,15 +36,13 @@ import com.swirlds.platform.state.PlatformState; import com.swirlds.platform.state.State; import com.swirlds.platform.state.SwirldStateManager; -import com.swirlds.platform.state.signed.ReservedSignedState; import com.swirlds.platform.system.SoftwareVersion; import com.swirlds.platform.system.status.StatusActionSubmitter; import com.swirlds.platform.system.status.actions.FreezePeriodEnteredAction; +import com.swirlds.platform.wiring.components.StateAndRound; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -99,20 +96,16 @@ void normalOperation() throws InterruptedException { final PlatformState platformState = mock(PlatformState.class); final SwirldStateManager swirldStateManager = mockSwirldStateManager(platformState); - final BlockingQueue stateHashSignQueue = mock(BlockingQueue.class); + final BlockingQueue stateHashSignQueue = mock(BlockingQueue.class); final CheckedConsumer waitForEventDurability = mock(CheckedConsumer.class); final StatusActionSubmitter statusActionSubmitter = mock(StatusActionSubmitter.class); - final AtomicLong roundAppliedToState = new AtomicLong(0); - final Consumer roundAppliedToStateConsumer = roundAppliedToState::set; - final ConsensusRoundHandler consensusRoundHandler = new ConsensusRoundHandler( platformContext, swirldStateManager, stateHashSignQueue, waitForEventDurability, statusActionSubmitter, - roundAppliedToStateConsumer, mock(SoftwareVersion.class)); final EventImpl keystoneEvent = mockEvent(); @@ -129,9 +122,8 @@ void normalOperation() throws InterruptedException { verify(statusActionSubmitter, never()).submitStatusAction(any(FreezePeriodEnteredAction.class)); verify(waitForEventDurability).accept(keystoneEvent.getBaseEvent()); verify(swirldStateManager).handleConsensusRound(consensusRound); - assertEquals(consensusRoundNumber, roundAppliedToState.get()); verify(swirldStateManager, never()).savedStateInFreezePeriod(); - verify(stateHashSignQueue).put(any(ReservedSignedState.class)); + verify(stateHashSignQueue).put(any(StateAndRound.class)); verify(platformState) .setRunningEventHash( events.getLast().getRunningHash().getFutureHash().getAndRethrow()); @@ -146,20 +138,16 @@ void freezeHandling() throws InterruptedException { final SwirldStateManager swirldStateManager = mockSwirldStateManager(platformState); when(swirldStateManager.isInFreezePeriod(any())).thenReturn(true); - final BlockingQueue stateHashSignQueue = mock(BlockingQueue.class); + final BlockingQueue stateHashSignQueue = mock(BlockingQueue.class); final CheckedConsumer waitForEventDurability = mock(CheckedConsumer.class); final StatusActionSubmitter statusActionSubmitter = mock(StatusActionSubmitter.class); - final AtomicLong roundAppliedToState = new AtomicLong(0); - final Consumer roundAppliedToStateConsumer = roundAppliedToState::set; - final ConsensusRoundHandler consensusRoundHandler = new ConsensusRoundHandler( platformContext, swirldStateManager, stateHashSignQueue, waitForEventDurability, statusActionSubmitter, - roundAppliedToStateConsumer, mock(SoftwareVersion.class)); final EventImpl keystoneEvent = mockEvent(); @@ -176,9 +164,8 @@ void freezeHandling() throws InterruptedException { verify(statusActionSubmitter).submitStatusAction(any(FreezePeriodEnteredAction.class)); verify(waitForEventDurability).accept(keystoneEvent.getBaseEvent()); verify(swirldStateManager).handleConsensusRound(consensusRound); - assertEquals(consensusRoundNumber, roundAppliedToState.get()); verify(swirldStateManager).savedStateInFreezePeriod(); - verify(stateHashSignQueue).put(any(ReservedSignedState.class)); + verify(stateHashSignQueue).put(any(StateAndRound.class)); verify(platformState) .setRunningEventHash( events.getLast().getRunningHash().getFutureHash().getAndRethrow()); @@ -194,7 +181,7 @@ void freezeHandling() throws InterruptedException { verify(waitForEventDurability).accept(keystoneEvent.getBaseEvent()); verify(swirldStateManager).handleConsensusRound(consensusRound); verify(swirldStateManager).savedStateInFreezePeriod(); - verify(stateHashSignQueue).put(any(ReservedSignedState.class)); + verify(stateHashSignQueue).put(any(StateAndRound.class)); verify(platformState) .setRunningEventHash( events.getLast().getRunningHash().getFutureHash().getAndRethrow()); diff --git a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/SwirldStateManagerTests.java b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/SwirldStateManagerTests.java index 2acd94d9981b..edf1b87a1362 100644 --- a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/SwirldStateManagerTests.java +++ b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/state/SwirldStateManagerTests.java @@ -53,7 +53,6 @@ void setup() { platformContext, addressBook, new NodeId(0L), - (a, b) -> {}, mock(SwirldStateMetrics.class), mock(StatusActionSubmitter.class), initialState, diff --git a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/wiring/PlatformWiringTests.java b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/wiring/PlatformWiringTests.java index 07993cebedb7..3ca100c6dbff 100644 --- a/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/wiring/PlatformWiringTests.java +++ b/platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/wiring/PlatformWiringTests.java @@ -42,6 +42,7 @@ import com.swirlds.platform.gossip.shadowgraph.Shadowgraph; import com.swirlds.platform.state.SwirldStateManager; import com.swirlds.platform.state.iss.IssDetector; +import com.swirlds.platform.state.iss.IssHandler; import com.swirlds.platform.state.signed.SignedStateFileManager; import com.swirlds.platform.state.signed.StateSignatureCollector; import com.swirlds.platform.util.HashLogger; @@ -82,6 +83,7 @@ void testBindings() { mock(EventStreamManager.class), mock(FutureEventBuffer.class), mock(IssDetector.class), + mock(IssHandler.class), mock(HashLogger.class), mock(LatestCompleteStateNotifier.class));