Skip to content

Commit

Permalink
Merge pull request #2043 from input-output-hk/add_test_dreps_delegation
Browse files Browse the repository at this point in the history
Add `test_dreps_delegation`
  • Loading branch information
mkoura authored Nov 21, 2023
2 parents f771156 + 9e7b4c4 commit c7a7df8
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 19 deletions.
153 changes: 134 additions & 19 deletions cardano_node_tests/tests/tests_conway/test_drep.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Tests for Conway governance DRep functionality."""
import logging
import pathlib as pl
import typing as tp

import allure
import pytest
from _pytest.fixtures import FixtureRequest
from cardano_clusterlib import clusterlib

from cardano_node_tests.cluster_management import cluster_management
from cardano_node_tests.tests import common
from cardano_node_tests.utils import cluster_nodes
from cardano_node_tests.utils import clusterlib_utils
from cardano_node_tests.utils import helpers
from cardano_node_tests.utils import submit_utils
Expand All @@ -29,10 +30,15 @@ def payment_addr(
cluster: clusterlib.ClusterLib,
) -> clusterlib.AddressRecord:
"""Create new payment address."""
addr = clusterlib_utils.create_payment_addr_records(
f"chain_tx_addr_ci{cluster_manager.cluster_instance_num}",
cluster_obj=cluster,
)[0]
with cluster_manager.cache_fixture() as fixture_cache:
if fixture_cache.value:
return fixture_cache.value # type: ignore

addr = clusterlib_utils.create_payment_addr_records(
f"drep_addr_ci{cluster_manager.cluster_instance_num}",
cluster_obj=cluster,
)[0]
fixture_cache.value = addr

# Fund source address
clusterlib_utils.fund_from_faucet(
Expand All @@ -45,17 +51,67 @@ def payment_addr(


@pytest.fixture
def pool_users_disposable(
def pool_user(
cluster_manager: cluster_management.ClusterManager,
cluster: clusterlib.ClusterLib,
) -> tp.List[clusterlib.PoolUser]:
"""Create function scoped pool users."""
) -> clusterlib.PoolUser:
"""Create a pool user."""
test_id = common.get_test_id(cluster)
pool_users = clusterlib_utils.create_pool_users(
pool_user = clusterlib_utils.create_pool_users(
cluster_obj=cluster,
name_template=f"{test_id}_pool_user",
no_of_addr=2,
no_of_addr=1,
)[0]

# Fund the payment address with some ADA
clusterlib_utils.fund_from_faucet(
pool_user.payment,
cluster_obj=cluster,
faucet_data=cluster_manager.cache.addrs_data["user1"],
amount=1_500_000,
)
return pool_users
return pool_user


@pytest.fixture
def custom_drep(
cluster_manager: cluster_management.ClusterManager,
cluster: clusterlib.ClusterLib,
payment_addr: clusterlib.AddressRecord,
) -> clusterlib_utils.DRepRegistration:
"""Create a custom DRep."""
if cluster_nodes.get_cluster_type().type != cluster_nodes.ClusterType.LOCAL:
pytest.skip("runs only on local cluster")

with cluster_manager.cache_fixture() as fixture_cache:
if fixture_cache.value:
return fixture_cache.value # type: ignore

reg_drep = clusterlib_utils.register_drep(
cluster_obj=cluster,
name_template="drep_custom",
payment_addr=payment_addr,
submit_method=submit_utils.SubmitMethods.CLI,
use_build_cmd=True,
)
fixture_cache.value = reg_drep

return reg_drep


def _check_delegation(deleg_state: dict, drep_id: str, stake_addr_hash: str) -> None:
drep_records = deleg_state["dstate"]["unified"]["credentials"]

stake_addr_key = f"keyHash-{stake_addr_hash}"
stake_addr_val = drep_records.get(stake_addr_key) or {}

expected_drep = f"drep-keyHash-{drep_id}"
if drep_id == "always_abstain":
expected_drep = "drep-alwaysAbstain"
elif drep_id == "always_no_confidence":
expected_drep = "drep-alwaysNoConfidence"

assert stake_addr_val.get("drep") == expected_drep


class TestDReps:
Expand Down Expand Up @@ -171,22 +227,33 @@ class TestDelegDReps:
@allure.link(helpers.get_vcs_link())
@submit_utils.PARAM_SUBMIT_METHOD
@common.PARAM_USE_BUILD_CMD
def test_always_abstain(
@pytest.mark.parametrize("drep", ("always_abstain", "always_no_confidence", "custom"))
@pytest.mark.testnets
@pytest.mark.smoke
def test_dreps_delegation(
self,
cluster: clusterlib.ClusterLib,
payment_addr: clusterlib.AddressRecord,
pool_users_disposable: tp.List[clusterlib.PoolUser],
pool_user: clusterlib.PoolUser,
custom_drep: clusterlib_utils.DRepRegistration,
testfile_temp_dir: pl.Path,
request: FixtureRequest,
use_build_cmd: bool,
submit_method: str,
drep: str,
):
"""Test delegating to always-abstain default DRep.
"""Test delegating to DReps.
* register stake address
* delegate stake to always-abstain default DRep
* delegate stake to following DReps:
- always-abstain
- always-no-confidence
- custom DRep
* check that stake address is registered
"""
temp_template = common.get_test_id(cluster)
pool_user = pool_users_disposable[0]
deposit_amt = cluster.g_query.get_address_deposit()

# Create stake address registration cert
Expand All @@ -200,7 +267,9 @@ def test_always_abstain(
deleg_cert = cluster.g_stake_address.gen_vote_delegation_cert(
addr_name=temp_template,
stake_vkey_file=pool_user.stake.vkey_file,
always_abstain=True,
drep_key_hash=custom_drep.drep_id if drep == "custom" else "",
always_abstain=drep == "always_abstain",
always_no_confidence=drep == "always_no_confidence",
)

tx_files = clusterlib.TxFiles(
Expand Down Expand Up @@ -235,13 +304,44 @@ def test_always_abstain(
signing_key_files=tx_files.signing_key_files,
tx_name=temp_template,
)

# Make sure we have enough time to finish the registration/delegation in one epoch
clusterlib_utils.wait_for_epoch_interval(
cluster_obj=cluster, start=1, stop=common.EPOCH_STOP_SEC_LEDGER_STATE
)

submit_utils.submit_tx(
submit_method=submit_method,
cluster_obj=cluster,
tx_file=tx_signed,
txins=tx_output.txins,
)

# Deregister stake address so it doesn't affect stake distribution
def _deregister():
with helpers.change_cwd(testfile_temp_dir):
# Deregister stake address
stake_addr_dereg_cert = cluster.g_stake_address.gen_stake_addr_deregistration_cert(
addr_name=f"{temp_template}_addr0",
deposit_amt=deposit_amt,
stake_vkey_file=pool_user.stake.vkey_file,
)
tx_files_dereg = clusterlib.TxFiles(
certificate_files=[stake_addr_dereg_cert],
signing_key_files=[
payment_addr.skey_file,
pool_user.stake.skey_file,
],
)
cluster.g_transaction.send_tx(
src_address=payment_addr.address,
tx_name=f"{temp_template}_dereg",
tx_files=tx_files_dereg,
deposit=-deposit_amt,
)

request.addfinalizer(_deregister)

assert cluster.g_query.get_stake_addr_info(
pool_user.stake.address
).address, f"Stake address is NOT registered: {pool_user.stake.address}"
Expand All @@ -252,5 +352,20 @@ def test_always_abstain(
== clusterlib.calculate_utxos_balance(tx_output.txins) - tx_output.fee - deposit_amt
), f"Incorrect balance for source address `{payment_addr.address}`"

# Check that stake address is delegated to always-abstain default DRep
# TODO
# Check that stake address is delegated to the correct DRep.
# This takes one epoch, so test this only for selected combinations of build command
# and submit method, and only if we are not running smoke tests.
if (
use_build_cmd
and submit_method == submit_utils.SubmitMethods.CLI
and "smoke" not in request.config.getoption("-m")
):
cluster.wait_for_new_epoch(padding_seconds=5)
deleg_state = clusterlib_utils.get_delegation_state(cluster_obj=cluster)
drep_id = custom_drep.drep_id if drep == "custom" else drep
stake_addr_hash = cluster.g_stake_address.get_stake_vkey_hash(
stake_vkey_file=pool_user.stake.vkey_file
)
_check_delegation(
deleg_state=deleg_state, drep_id=drep_id, stake_addr_hash=stake_addr_hash
)
32 changes: 32 additions & 0 deletions cardano_node_tests/utils/clusterlib_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,38 @@ def filtered_ledger_state(
return helpers.run_in_bash(cmd).decode("utf-8").strip()


def get_delegation_state(
cluster_obj: clusterlib.ClusterLib,
) -> dict:
"""Get `delegationState` section of ledger state."""
cardano_cli_args = [
"cardano-cli",
"query",
"ledger-state",
*cluster_obj.magic_args,
f"--{cluster_obj.protocol}-mode",
]
cardano_cmd = " ".join(cardano_cli_args)

# Record cli coverage
clusterlib.record_cli_coverage(
cli_args=cardano_cli_args, coverage_dict=cluster_obj.cli_coverage
)

# Get rid of a huge amount of data we don't have any use for
cmd = (
f"{cardano_cmd} | jq -n --stream -c "
"'fromstream(3|truncate_stream(inputs|select(.[0][2] == \"delegationState\")))'"
)

deleg_state_raw = helpers.run_in_bash(cmd).decode("utf-8").strip()
if not deleg_state_raw:
return {}

deleg_state: dict = json.loads(deleg_state_raw)
return deleg_state


def get_blocks_before(
cluster_obj: clusterlib.ClusterLib,
) -> tp.Dict[str, int]:
Expand Down

0 comments on commit c7a7df8

Please sign in to comment.