Skip to content

Commit

Permalink
create --duration option using sts.assume_role(), closes #27
Browse files Browse the repository at this point in the history
Also with new integration test, refs #30
  • Loading branch information
simonw committed Nov 11, 2021
1 parent f70e76c commit f6fd163
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 14 deletions.
118 changes: 117 additions & 1 deletion s3_credentials/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import click
import configparser
import json
import re
from . import policies


Expand Down Expand Up @@ -90,6 +91,26 @@ def convert(self, policy, param, ctx):
self.fail("File not found")


class DurationParam(click.ParamType):
name = "duration"
pattern = re.compile(r"^(\d+)(m|h|s)?$")

def convert(self, value, param, ctx):
match = self.pattern.match(value)
if match is None:
self.fail("Duration must be of form 3600s or 15m or 2h")
integer_string, suffix = match.groups()
integer = int(integer_string)
if suffix == "m":
integer *= 60
elif suffix == "h":
integer *= 3600
# Must be between 15 minutes and 12 hours
if not (15 * 60 <= integer <= 12 * 60 * 60):
self.fail("Duration must be between 15 minutes and 12 hours")
return integer


@cli.command()
@click.argument(
"buckets",
Expand All @@ -104,6 +125,12 @@ def convert(self, policy, param, ctx):
default="json",
help="Output format for credentials",
)
@click.option(
"-d",
"--duration",
type=DurationParam(),
help="How long should these credentials work for? Default is forever, use 3600 for 3600 seconds, 15m for 15 minutes, 1h for 1 hour",
)
@click.option("--username", help="Username to create or existing user to use")
@click.option(
"-c",
Expand Down Expand Up @@ -132,6 +159,7 @@ def convert(self, policy, param, ctx):
def create(
buckets,
format_,
duration,
username,
create_bucket,
read_only,
Expand Down Expand Up @@ -159,6 +187,7 @@ def log(message):
permission = "write-only"
s3 = make_client("s3", **boto_options)
iam = make_client("iam", **boto_options)
sts = boto3.client("sts")
# Verify buckets
for bucket in buckets:
# Create bucket if it doesn't exist
Expand All @@ -182,7 +211,42 @@ def log(message):
if bucket_region:
info += "in region: {}".format(bucket_region)
log(info)
# Buckets created - now create the user, if needed
# At this point the buckets definitely exist - create the inline policy
bucket_access_policy = {}
if policy:
bucket_access_policy = json.loads(policy.replace("$!BUCKET_NAME!$", bucket))
else:
statements = []
if permission == "read-write":
for bucket in buckets:
statements.extend(policies.read_write_statements(bucket))
elif permission == "read-only":
for bucket in buckets:
statements.extend(policies.read_only_statements(bucket))
elif permission == "write-only":
for bucket in buckets:
statements.extend(policies.write_only_statements(bucket))
else:
assert False, "Unknown permission: {}".format(permission)
bucket_access_policy = policies.wrap_policy(statements)

if duration:
# We're going to use sts.assume_role() rather than creating a user
s3_role_arn = ensure_s3_role_exists(iam, sts)
log("Assume role against {} for {}s".format(s3_role_arn, duration))
credentials_response = sts.assume_role(
RoleArn=s3_role_arn,
RoleSessionName="s3.{permission}.{buckets}".format(
permission="custom" if policy else permission, buckets=",".join(buckets)
),
Policy=json.dumps(bucket_access_policy),
DurationSeconds=duration,
)
click.echo(
json.dumps(credentials_response["Credentials"], indent=4, default=str)
)
return
# No duration, so wo create a new user so we can issue non-expiring credentials
if not username:
# Default username is "s3.read-write.bucket1,bucket2"
username = "s3.{permission}.{buckets}".format(
Expand Down Expand Up @@ -430,3 +494,55 @@ def make_client(service, access_key, secret_key, session_token, endpoint_url, au
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
return boto3.client(service, **kwargs)


def ensure_s3_role_exists(iam, sts):
"Create s3-credentials.AmazonS3FullAccess role if not exists, return ARN"
role_name = "s3-credentials.AmazonS3FullAccess"
account_id = sts.get_caller_identity()["Account"]
try:
role = iam.get_role(RoleName=role_name)
return role["Role"]["Arn"]
except iam.exceptions.NoSuchEntityException:
create_role_response = iam.create_role(
Description=(
"Role used by the s3-credentials tool to create time-limited "
"credentials that are restricted to specific buckets"
),
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{}:root".format(account_id)
},
"Action": "sts:AssumeRole",
"Condition": {},
}
],
}
),
)
# Attach AmazonS3FullAccess to it - note that even though we use full access
# on the role itself any time we call sts.assume_role() we attach an additional
# policy to ensure reduced access for the temporary credentials
iam.attach_role_policy(
RoleName="s3-credentials.AmazonS3FullAccess",
PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess",
)
return create_role_response["Role"]["Arn"]


@cli.command()
@click.argument("bucket")
@common_boto3_options
def list_bucket(bucket, **boto_options):
"List content of bucket"
s3 = make_client("s3", **boto_options)
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket):
for row in page["Contents"]:
click.echo(json.dumps(row, indent=4, default=str))
8 changes: 4 additions & 4 deletions s3_credentials/policies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
def read_write(bucket):
return _policy(read_write_statements(bucket))
return wrap_policy(read_write_statements(bucket))


def read_write_statements(bucket):
Expand All @@ -14,7 +14,7 @@ def read_write_statements(bucket):


def read_only(bucket):
return _policy(read_only_statements(bucket))
return wrap_policy(read_only_statements(bucket))


def read_only_statements(bucket):
Expand All @@ -40,7 +40,7 @@ def read_only_statements(bucket):


def write_only(bucket):
return _policy(write_only_statements(bucket))
return wrap_policy(write_only_statements(bucket))


def write_only_statements(bucket):
Expand All @@ -54,5 +54,5 @@ def write_only_statements(bucket):
]


def _policy(statements):
def wrap_policy(statements):
return {"Version": "2012-10-17", "Statement": statements}
48 changes: 48 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# and clean up after themselves
from click.testing import CliRunner
from s3_credentials.cli import bucket_exists, cli
import botocore
import boto3
import datetime
import json
import pytest
import secrets
Expand Down Expand Up @@ -48,6 +50,52 @@ def test_create_bucket_with_read_write():
assert credentials_response["Body"].read() == b"hello"


def test_create_bucket_read_only_duration_15():
bucket_name = "s3-credentials-tests.read-only.{}".format(secrets.token_hex(4))
s3 = boto3.client("s3")
assert not bucket_exists(s3, bucket_name)
credentials_decoded = json.loads(
get_output("create", bucket_name, "-c", "--duration", "15m", "--read-only")
)
assert set(credentials_decoded.keys()) == {
"AccessKeyId",
"SecretAccessKey",
"SessionToken",
"Expiration",
}
# Expiration should be ~15 minutes in the future
delta = (
datetime.datetime.fromisoformat(credentials_decoded["Expiration"])
- datetime.datetime.now(datetime.timezone.utc)
).total_seconds()
# Should be around about 900 seconds
assert 800 < delta < 1000
# Wait for everything to exist
time.sleep(10)
# Create client with these credentials
credentials_s3 = boto3.session.Session(
aws_access_key_id=credentials_decoded["AccessKeyId"],
aws_secret_access_key=credentials_decoded["SecretAccessKey"],
aws_session_token=credentials_decoded["SessionToken"],
).client("s3")
# Client should NOT be allowed to write objects
with pytest.raises(botocore.exceptions.ClientError):
credentials_s3.put_object(
Body="hello".encode("utf-8"), Bucket=bucket_name, Key="hello.txt"
)
# Write an object using root credentials
s3.put_object(
Body="hello read-only".encode("utf-8"),
Bucket=bucket_name,
Key="hello-read-only.txt",
)
# Client should be able to read this
credentials_response = credentials_s3.get_object(
Bucket=bucket_name, Key="hello-read-only.txt"
)
assert credentials_response["Body"].read() == b"hello read-only"


def get_output(*args, input=None):
runner = CliRunner(mix_stderr=False)
with runner.isolated_filesystem():
Expand Down
79 changes: 70 additions & 9 deletions tests/test_s3_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,20 @@ def test_list_buckets_details(stub_s3):
WRITE_ONLY_POLICY = '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": ["s3:PutObject"], "Resource": ["arn:aws:s3:::pytest-bucket-simonw-1/*"]}]}'


# Used by both test_create and test_create_duration
CREATE_TESTS = (
([], False, READ_WRITE_POLICY, "read-write"),
(["--read-only"], False, READ_ONLY_POLICY, "read-only"),
(["--write-only"], False, WRITE_ONLY_POLICY, "write-only"),
(["--policy", "POLICYFILEPATH"], False, CUSTOM_POLICY, "custom"),
(["--policy", "-"], True, CUSTOM_POLICY, "custom"),
(["--policy", CUSTOM_POLICY], False, CUSTOM_POLICY, "custom"),
)


@pytest.mark.parametrize(
"options,use_policy_stdin,expected_policy,expected_name_fragment",
(
([], False, READ_WRITE_POLICY, "read-write"),
(["--read-only"], False, READ_ONLY_POLICY, "read-only"),
(["--write-only"], False, WRITE_ONLY_POLICY, "write-only"),
(["--policy", "POLICYFILEPATH"], False, CUSTOM_POLICY, "custom"),
(["--policy", "-"], True, CUSTOM_POLICY, "custom"),
(["--policy", CUSTOM_POLICY], False, CUSTOM_POLICY, "custom"),
),
CREATE_TESTS,
)
def test_create(
mocker, tmpdir, options, use_policy_stdin, expected_policy, expected_name_fragment
Expand All @@ -318,7 +322,7 @@ def test_create(
kwargs = {}
if use_policy_stdin:
kwargs["input"] = CUSTOM_POLICY
result = runner.invoke(cli, args, **kwargs)
result = runner.invoke(cli, args, **kwargs, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == (
"Attached policy s3.NAME_FRAGMENT.pytest-bucket-simonw-1 to user s3.NAME_FRAGMENT.pytest-bucket-simonw-1\n"
Expand All @@ -328,6 +332,7 @@ def test_create(
assert [str(c) for c in boto3.mock_calls] == [
"call('s3')",
"call('iam')",
"call('sts')",
"call().head_bucket(Bucket='pytest-bucket-simonw-1')",
"call().get_user(UserName='s3.{}.pytest-bucket-simonw-1')".format(
expected_name_fragment
Expand All @@ -343,6 +348,62 @@ def test_create(
]


@pytest.mark.parametrize(
"options,use_policy_stdin,expected_policy,expected_name_fragment",
CREATE_TESTS,
)
def test_create_duration(
mocker, tmpdir, options, use_policy_stdin, expected_policy, expected_name_fragment
):
boto3 = mocker.patch("boto3.client")
boto3.return_value = Mock()
boto3.return_value.create_access_key.return_value = {
"AccessKey": {"AccessKeyId": "access", "SecretAccessKey": "secret"}
}
boto3.return_value.get_caller_identity.return_value = {"Account": "1234"}
boto3.return_value.get_role.return_value = {"Role": {"Arn": "arn:::role"}}
boto3.return_value.assume_role.return_value = {
"Credentials": {"AccessKeyId": "access", "SecretAccessKey": "secret"}
}
runner = CliRunner()
with runner.isolated_filesystem():
filepath = str(tmpdir / "policy.json")
open(filepath, "w").write(CUSTOM_POLICY)
fixed_options = [
filepath if option == "POLICYFILEPATH" else option for option in options
]
args = [
"create",
"pytest-bucket-simonw-1",
"-c",
"--duration",
"15m",
] + fixed_options
kwargs = {}
if use_policy_stdin:
kwargs["input"] = CUSTOM_POLICY
result = runner.invoke(cli, args, **kwargs, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == (
"Assume role against arn:::role for 900s\n"
'{\n "AccessKeyId": "access",\n "SecretAccessKey": "secret"\n}\n'
)
assert [str(c) for c in boto3.mock_calls] == [
"call('s3')",
"call('iam')",
"call('sts')",
"call().head_bucket(Bucket='pytest-bucket-simonw-1')",
"call().get_caller_identity()",
"call().get_role(RoleName='s3-credentials.AmazonS3FullAccess')",
"call().assume_role(RoleArn='arn:::role', RoleSessionName='s3.{fragment}.pytest-bucket-simonw-1', Policy='{policy}', DurationSeconds=900)".format(
fragment=expected_name_fragment,
policy=expected_policy.replace(
"$!BUCKET_NAME!$", "pytest-bucket-simonw-1"
),
),
]


def test_create_format_ini(mocker):
boto3 = mocker.patch("boto3.client")
boto3.return_value = Mock()
Expand Down

0 comments on commit f6fd163

Please sign in to comment.