Skip to content

Commit

Permalink
Merge pull request #537 from cbodley/wip-iam-user-apis
Browse files Browse the repository at this point in the history
iam: add tests for account-based IAM apis
  • Loading branch information
cbodley authored Apr 12, 2024
2 parents 88fd867 + dfabbf5 commit 54c1488
Show file tree
Hide file tree
Showing 7 changed files with 2,447 additions and 115 deletions.
11 changes: 9 additions & 2 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,28 @@ markers =
fails_on_rgw
fails_on_s3
fails_with_subdomain
group
group_policy
iam_account
iam_cross_account
iam_role
iam_tenant
iam_user
lifecycle
lifecycle_expiration
lifecycle_transition
list_objects_v2
object_lock
role_policy
session_policy
s3select
s3website
s3website_routing_rules
s3website_redirect_location
3website
sns
sse_s3
storage_class
tagging
test_of_iam
test_of_sts
token_claims_trust_policy_test
token_principal_tag_role_policy_test
Expand Down
22 changes: 22 additions & 0 deletions s3tests.conf.SAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ ssl_verify = False
## the prefix to 30 characters long, and avoid collisions
bucket prefix = yournamehere-{random}-

# all the iam account resources (users, roles, etc) created
# will start with this name prefix
iam name prefix = s3-tests-

# all the iam account resources (users, roles, etc) created
# will start with this path prefix
iam path prefix = /s3-tests/

[s3 main]
# main display_name set in vstart.sh
display_name = M. Tester
Expand Down Expand Up @@ -127,6 +135,20 @@ secret_key = abcdefghijklmnopqrstuvwxyzabcdefghijklmn
#display_name from vstart.sh
display_name = youruseridhere

# iam account root user for iam_account tests
[iam root]
access_key = AAAAAAAAAAAAAAAAAAaa
secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
user_id = RGW11111111111111111
email = [email protected]

# iam account root user in a different account than [iam root]
[iam alt root]
access_key = BBBBBBBBBBBBBBBBBBbb
secret_key = bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
user_id = RGW22222222222222222
email = [email protected]

#following section needs to be added when you want to run Assume Role With Webidentity test
[webidentity]
#used for assume role with web identity test in sts-tests
Expand Down
144 changes: 92 additions & 52 deletions s3tests_boto3/functional/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def configured_storage_classes():

return sc

def setup():
def configure():
cfg = configparser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
Expand Down Expand Up @@ -260,24 +260,41 @@ def setup():
config.tenant_user_id = cfg.get('s3 tenant',"user_id")
config.tenant_email = cfg.get('s3 tenant',"email")

config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key")
config.iam_display_name = cfg.get('iam',"display_name")
config.iam_user_id = cfg.get('iam',"user_id")
config.iam_email = cfg.get('iam',"email")

config.iam_root_access_key = cfg.get('iam root',"access_key")
config.iam_root_secret_key = cfg.get('iam root',"secret_key")
config.iam_root_user_id = cfg.get('iam root',"user_id")
config.iam_root_email = cfg.get('iam root',"email")

config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key")
config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key")
config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id")
config.iam_alt_root_email = cfg.get('iam alt root',"email")

# vars from the fixtures section
try:
template = cfg.get('fixtures', "bucket prefix")
except (configparser.NoOptionError):
template = 'test-{random}-'
template = cfg.get('fixtures', "bucket prefix", fallback='test-{random}-')
prefix = choose_bucket_prefix(template=template)

alt_client = get_alt_client()
tenant_client = get_tenant_client()
nuke_prefixed_buckets(prefix=prefix)
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)
template = cfg.get('fixtures', "iam name prefix", fallback="s3-tests-")
config.iam_name_prefix = choose_bucket_prefix(template=template)
template = cfg.get('fixtures', "iam path prefix", fallback="/s3-tests/")
config.iam_path_prefix = choose_bucket_prefix(template=template)

if cfg.has_section("s3 cloud"):
get_cloud_config(cfg)
else:
config.cloud_storage_class = None

def setup():
alt_client = get_alt_client()
tenant_client = get_tenant_client()
nuke_prefixed_buckets(prefix=prefix)
nuke_prefixed_buckets(prefix=prefix, client=alt_client)
nuke_prefixed_buckets(prefix=prefix, client=tenant_client)

def teardown():
alt_client = get_alt_client()
Expand Down Expand Up @@ -306,11 +323,12 @@ def teardown():

@pytest.fixture(scope="package")
def configfile():
setup()
configure()
return config

@pytest.fixture(autouse=True)
def setup_teardown(configfile):
setup()
yield
teardown()

Expand Down Expand Up @@ -395,64 +413,65 @@ def get_v2_client():
config=Config(signature_version='s3'))
return client

def get_sts_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
def get_sts_client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.alt_access_key)
kwargs.setdefault('aws_secret_access_key', config.alt_secret_key)
kwargs.setdefault('config', Config(signature_version='s3v4'))

client = boto3.client(service_name='sts',
aws_access_key_id=config.alt_access_key,
aws_secret_access_key=config.alt_secret_key,
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
config=client_config)
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)
return client

def get_iam_client(client_config=None):
cfg = configparser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
except KeyError:
raise RuntimeError(
'To run tests, point environment '
+ 'variable S3TEST_CONF to a config file.',
)
cfg.read(path)
if not cfg.has_section("iam"):
raise RuntimeError('Your config file is missing the "iam" section!')

config.iam_access_key = cfg.get('iam',"access_key")
config.iam_secret_key = cfg.get('iam',"secret_key")
config.iam_display_name = cfg.get('iam',"display_name")
config.iam_user_id = cfg.get('iam',"user_id")
config.iam_email = cfg.get('iam',"email")
def get_iam_client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)

if client_config == None:
client_config = Config(signature_version='s3v4')

client = boto3.client(service_name='iam',
aws_access_key_id=config.iam_access_key,
aws_secret_access_key=config.iam_secret_key,
endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
config=client_config)
**kwargs)
return client

def get_iam_s3client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
def get_iam_s3client(**kwargs):
kwargs.setdefault('aws_access_key_id', config.iam_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_secret_key)
kwargs.setdefault('config', Config(signature_version='s3v4'))

client = boto3.client(service_name='s3',
aws_access_key_id=get_iam_access_key(),
aws_secret_access_key=get_iam_secret_key(),
endpoint_url=config.default_endpoint,
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
config=client_config)
**kwargs)
return client

def get_iam_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_root_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_root_secret_key)

return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)

def get_iam_alt_root_client(**kwargs):
kwargs.setdefault('service_name', 'iam')
kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key)
kwargs.setdefault('aws_secret_access_key', config.iam_alt_root_secret_key)

return boto3.client(endpoint_url=config.default_endpoint,
region_name='',
use_ssl=config.default_is_secure,
verify=config.default_ssl_verify,
**kwargs)

def get_alt_client(client_config=None):
if client_config == None:
client_config = Config(signature_version='s3v4')
Expand Down Expand Up @@ -699,12 +718,33 @@ def get_token():
def get_realm_name():
return config.webidentity_realm

def get_iam_name_prefix():
return config.iam_name_prefix

def make_iam_name(name):
return config.iam_name_prefix + name

def get_iam_path_prefix():
return config.iam_path_prefix

def get_iam_access_key():
return config.iam_access_key

def get_iam_secret_key():
return config.iam_secret_key

def get_iam_root_user_id():
return config.iam_root_user_id

def get_iam_root_email():
return config.iam_root_email

def get_iam_alt_root_user_id():
return config.iam_alt_root_user_id

def get_iam_alt_root_email():
return config.iam_alt_root_email

def get_user_token():
return config.webidentity_user_token

Expand Down
Loading

0 comments on commit 54c1488

Please sign in to comment.