Skip to content

Commit

Permalink
NAS-130509 / 25.04 / Add CI test for discovery auth (#15010)
Browse files Browse the repository at this point in the history
* Add ISCSIDiscover

* Add test__discover_from_initiator

* Robustize test: delay after changes when querying the STANDBY node
  • Loading branch information
bmeagherix authored Nov 22, 2024
1 parent d784286 commit b078b47
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 13 deletions.
168 changes: 157 additions & 11 deletions tests/api2/test_261_iscsi_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,21 @@
import pyscsi
import pytest
import requests
from assets.websocket.iscsi import (alua_enabled, initiator, initiator_portal,
portal, read_capacity16, target,
target_extent_associate, verify_capacity,
verify_luns, verify_ha_inquiry, verify_ha_device_identification, TUR)
from assets.websocket.iscsi import (TUR, alua_enabled, initiator, initiator_portal, portal, read_capacity16, target,
target_extent_associate, verify_capacity, verify_ha_device_identification,
verify_ha_inquiry, verify_luns)
from assets.websocket.service import ensure_service_enabled, ensure_service_started
from auto_config import ha, hostname, isns_ip, password, pool_name, user
from functions import SSH_TEST
from protocols import ISCSIDiscover, initiator_name_supported, iscsi_scsi_connection, isns_connection
from pyscsi.pyscsi.scsi_sense import sense_ascq_dict
from pytest_dependency import depends

from middlewared.service_exception import InstanceNotFound, ValidationError, ValidationErrors
from middlewared.test.integration.assets.iscsi import target_login_test
from middlewared.test.integration.assets.pool import dataset, snapshot
from middlewared.test.integration.utils import call, ssh
from middlewared.test.integration.utils.client import truenas_server
from pyscsi.pyscsi.scsi_sense import sense_ascq_dict
from pytest_dependency import depends

from auto_config import ha, hostname, isns_ip, password, pool_name, user
from functions import SSH_TEST
from protocols import (initiator_name_supported, iscsi_scsi_connection,
isns_connection)

# Setup some flags that will enable/disable tests based upon the capabilities of the
# python-scsi package in use
Expand Down Expand Up @@ -84,6 +82,14 @@ def __str__(self):
CONTROLLER_B_TARGET_PORT_GROUP_ID = 102
SERVICE_NAME = 'iscsitarget'

CHAPUSER1 = 'chapuser1'
CHAPPASS1 = 'chappassword1'

CHAPUSER2 = 'chapuser2'
CHAPPASS2 = 'userpassword2'
CHAPPEERUSER2 = 'chappeer2'
CHAPPEERPASS2 = 'peerpassword2'

# Some variables
digit = ''.join(random.choices(string.digits, k=2))
file_mountpoint = f'/tmp/iscsi-file-{hostname}'
Expand Down Expand Up @@ -785,6 +791,146 @@ def test__discovery_auth():
assert [] == call('iscsi.auth.query')


@contextlib.contextmanager
def _discovery(ip):
with ISCSIDiscover(ip) as nocred:
with ISCSIDiscover(ip, CHAPUSER1, CHAPPASS1) as user1:
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2,
CHAPPEERUSER2, CHAPPEERPASS2) as user2:
yield {
'nocred': nocred,
'user1': user1,
'user2': user2,
}


def _discovery_validate_one(disc: ISCSIDiscover, iqns: set):
result = disc.discover()
assert set(result.keys()) == iqns


def _discovery_validate_all(discs: dict, iqns: set):
for disc in discs.values():
_discovery_validate_one(disc, iqns)


def test__discover_from_initiator(iscsi_running):
"""
Verify that discovery auth operates as expected, by performing iSCSI
discovery operations from the initiator in various configs.
"""
name1 = f"{target_name}x1"
name2 = f"{target_name}x2"
iqn1 = f'{basename}:{name1}'
iqn2 = f'{basename}:{name2}'

EMPTY_SET = set()
ONE_IQN_SET = set([iqn1])
TWO_IQNS_SET = set([iqn1, iqn2])
DISCOVER_DELAY = 10

def _discovery_validate_two_targets(ip: str, discs: dict, delay: int | None = None):
if delay:
sleep(delay)
_discovery_validate_one(discs['nocred'], TWO_IQNS_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
# Create an auth without discovery_auth and ensure it has
# no impact.
with iscsi_auth(1, CHAPUSER1, CHAPPASS1):
if delay:
sleep(delay)
_discovery_validate_one(discs['nocred'], TWO_IQNS_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
# Create an auth with CHAP discovery_auth and ensure it means only
# a discovery with the correct cred works.
with iscsi_auth(1, CHAPUSER1, CHAPPASS1, discovery_auth='CHAP'):
if delay:
sleep(delay)
_discovery_validate_one(discs['nocred'], EMPTY_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER1, "WrongChapPass") as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
"WrongChapUser", CHAPPASS1) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
# Create a 2nd auth and ensure they both work
with iscsi_auth(2, CHAPUSER2, CHAPPASS2, discovery_auth='CHAP'):
if delay:
sleep(delay)
_discovery_validate_one(discs['nocred'], EMPTY_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2) as gooddisc:
_discovery_validate_one(gooddisc, TWO_IQNS_SET)
# Create an auth with CHAP_MUTUAL discovery_auth and ensure it means only
# a discovery with the correct cred works.
with iscsi_auth(2, CHAPUSER2, CHAPPASS2,
CHAPPEERUSER2, CHAPPEERPASS2,
discovery_auth='CHAP_MUTUAL'):
if delay:
sleep(delay)
_discovery_validate_one(discs['nocred'], EMPTY_SET)
_discovery_validate_one(discs['user1'], EMPTY_SET)
_discovery_validate_one(discs['user2'], TWO_IQNS_SET)
with ISCSIDiscover(ip,
"WrongChapUser", CHAPPASS2,
CHAPPEERUSER2, CHAPPEERPASS2) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, "WrongChapPass",
CHAPPEERUSER2, CHAPPEERPASS2) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2,
"WrongChapPeer", CHAPPEERPASS2) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2,
CHAPPEERUSER2, "WrongPeerPass") as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)

with initiator_portal() as config:
with _discovery(truenas_server.ip) as discs:
# No targets published yet, ensure we see none via discovery
_discovery_validate_all(discs, EMPTY_SET)
with configured_target(config, name1, "VOLUME"):
# One target published, ensure we see it via discovery
_discovery_validate_one(discs['nocred'], ONE_IQN_SET)
_discovery_validate_one(discs['user1'], ONE_IQN_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
with configured_target(config, name2, "VOLUME"):
# Two target published, ensure we see them via discovery
_discovery_validate_two_targets(truenas_server.ip, discs)
if ha:
# If we are a HA system then enable ALUA and perform a bunch of
# similar tests
with alua_enabled():
_ensure_alua_state(True)
_wait_for_alua_settle()
with _discovery(truenas_server.nodea_ip) as nodea_discs:
with _discovery(truenas_server.nodeb_ip) as nodeb_discs:
# No targets published yet, ensure we see none via discovery
_discovery_validate_all(nodea_discs, EMPTY_SET)
_discovery_validate_all(nodeb_discs, EMPTY_SET)
with configured_target(config, name1, "VOLUME"):
with configured_target(config, name2, "VOLUME"):
# We will delay after changes when querying the STANDBY node
node = call('failover.node')
nodeb_delay = DISCOVER_DELAY if node == 'A' else None
nodea_delay = DISCOVER_DELAY if node == 'B' else None
_discovery_validate_two_targets(truenas_server.nodea_ip, nodea_discs, nodea_delay)
_discovery_validate_two_targets(truenas_server.nodeb_ip, nodeb_discs, nodeb_delay)

# Turned off ALUA again
_wait_for_alua_settle()


def test__report_luns(iscsi_running):
"""
This tests REPORT LUNS and accessing multiple LUNs on a target.
Expand Down
4 changes: 2 additions & 2 deletions tests/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from functions import DELETE, POST

from .ftp_proto import ftp_connect, ftps_connect, ftp_connection, ftps_connection # noqa
from .iscsi_proto import initiator_name_supported, iscsi_scsi_connect, iscsi_scsi_connection # noqa
from .ftp_proto import ftp_connect, ftp_connection, ftps_connect, ftps_connection # noqa
from .iscsi_proto import ISCSIDiscover, initiator_name_supported, iscsi_scsi_connect, iscsi_scsi_connection # noqa
from .iSNSP.client import iSNSPClient
from .ms_rpc import MS_RPC # noqa
from .nfs_proto import SSH_NFS # noqa
Expand Down
70 changes: 70 additions & 0 deletions tests/protocols/iscsi_proto.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import contextlib
import inspect
import socket

import iscsi
from functions import SRVTarget, get_host_ip
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.utils import init_device

Expand Down Expand Up @@ -75,3 +78,70 @@ def iscsi_scsi_connection(host, iqn, lun=0, user=None, secret=None, target_user=
yield s
finally:
s.device.close()


class ISCSIDiscover:
def __init__(self,
hostname=None,
initiator_username=None,
initiator_password=None,
target_username=None,
target_password=None,
initiator_name=None,
):
self._hostname = hostname or get_host_ip(SRVTarget.DEFAULT)
self._initiator_username = None
self._initiator_password = None
self._target_username = None
self._target_password = None
self._initiator_name = None

try:
self._ip = socket.gethostbyname(self._hostname)
except socket.error:
raise ValueError(f'Cannot resolve: {self._hostname}')

if initiator_username is not None or initiator_password is not None:
if initiator_username is None or initiator_password is None:
raise ValueError("If supply one then must supply both: initiator_username, initiator_password")
self._initiator_username = initiator_username
self._initiator_password = initiator_password

if target_username is not None or target_password is not None:
if target_username is None or target_password is None:
raise ValueError("If supply one then must supply both: target_username, target_password")
self._target_username = target_username
self._target_password = target_password

if initiator_name:
self._initiator_name = initiator_name
else:
self._initiator_name = f'iqn.2018-01.org.pyscsi:{socket.gethostname()}'

def __enter__(self):
return self

def discover(self):
connected = False
try:
ctx = iscsi.Context(self._initiator_name)
ctx.set_session_type(iscsi.ISCSI_SESSION_DISCOVERY)
ctx.set_header_digest(iscsi.ISCSI_HEADER_DIGEST_NONE)
if self._initiator_username and self._initiator_password:
ctx.set_initiator_username_pwd(self._initiator_username, self._initiator_password)
if self._target_username and self._target_password:
ctx.set_target_username_pwd(self._target_username, self._target_password)
ctx.connect(self._ip, -1)
connected = True
return ctx.discover()
except Exception:
return {}
finally:
if connected:
ctx.disconnect()

def ip(self):
return self._ip

def __exit__(self, exc_type, exc_val, exc_tb):
pass

0 comments on commit b078b47

Please sign in to comment.