Skip to content

Commit

Permalink
feat: adding admin features and kms create and delete
Browse files Browse the repository at this point in the history
  • Loading branch information
johnson2427 committed Apr 30, 2024
1 parent be35442 commit 2d0fe60
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 42 deletions.
2 changes: 2 additions & 0 deletions ape_aws/_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import click

from ape_aws.admin._cli import admin
from ape_aws.kms._cli import kms


Expand All @@ -8,4 +9,5 @@ def cli():
"""Ape AWS CLI commands"""


cli.add_command(admin)
cli.add_command(kms)
Empty file added ape_aws/admin/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions ape_aws/admin/_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import boto3
import click


@click.group("admin")
def admin():
"""Manage AWS Admin information"""


@admin.command()
def list_admins():
iam_client = boto3.client('iam')
response = iam_client.list_users()
admins = []
for user in response['Users']:
user_name = user['UserName']
user_policies = iam_client.list_attached_user_policies(UserName=user_name)
for policy in user_policies['AttachedPolicies']:
if policy['PolicyName'] == 'AdministratorAccess':
admins.append(user_name)

click.echo(f'Administrators: {admins}')


@admin.command()
def list_users():
iam_client = boto3.client('iam')
response = iam_client.list_users()
click.echo(f'Users: {response.get("Users")}')


@admin.command()
@click.option('--dictionary', '-d', multiple=True)
def check_this(dictionary):
print(dictionary)
97 changes: 55 additions & 42 deletions ape_aws/kms/_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import boto3
import click

from ape.cli import ape_cli_context

from ape_aws.accounts import KmsAccount, AwsAccountContainer


Expand Down Expand Up @@ -37,61 +34,75 @@ def kms():


@kms.command()
@ape_cli_context
@click.argument("alias")
def account(alias):
"""
Import an existing Ethereum Private Key into AWS KMS
"""
aws_accounts = AwsAccountContainer(data_folder='./', account_type=KmsAccount)
kms_account = None
for account in list(aws_accounts.accounts):
if account.key_alias == alias:
kms_account = account

if not kms_account:
raise ValueError(f"No KMS Key with alias name {alias}")

return kms_account


@kms.command()
@ape_cli_context
@click.option(
"-a",
"--admin",
"administrators",
multiple=True,
help="Apply key policy to a list of administrators if applicable, ex. -a A1, -a A2",
metavar='list[str]',
)
@click.option(
"-u",
"--user",
"users",
multiple=True,
help="Apply key policy to a list of users if applicable, ex. -u U1, -u U2",
metavar='list[str]',
)
@click.option(
"-t",
"--tag",
"tags",
multiple=True,
help="Apply tags to the newly created KMS key, ex. -t k1=v1 -t k2=v2",
metavar='list[dict]'
)
@click.argument("alias_name")
@click.argument("description")
@click.argument("tag_key")
@click.argument("tag_value")
@click.option("-a", "--admin", "administrators", multiple=True, nargs="+")
@click.option("-u", "--user", "users", multiple=True, nargs="+")
def import_key(alias_name, description, tag_key, tag_value, administrators, users):
def create_key(
alias_name: str,
description: str,
administrators: list[str],
users: list[str],
tags: list[dict],
):
"""
Import an existing Ethereum Private Key into AWS KMS
Create an Ethereum Private Key in AWS KmsAccount
\b
Args:
alias_name str: The alias of the key you intend to create
description str: The description of the key you intend to create.
"""
aws_account = AwsAccountContainer(data_folder='./', account_type=KmsAccount)
response = aws_account.kms_client.create_key(
Description=description,
KeyUsage='SIGN_VERIFY',
KeySpec='ECC_SECG_P256K1',
Origin='External',
Origin='AWS_KMS',
MultiRegion=False,
)
key_id = response['KeyMetadata']['KeyId']
aws_account.kms_client.create_alias(
AliasName=f'alias/{alias_name}',
TargetKeyId=key_id,
)
aws_account.kms_client.tag_resource(
KeyId=key_id,
Tags=[{'TagKey': tag_key, 'TagValue': tag_value}],
)
# Note: Get the ARN from AWS
if tags:
tags_list = []
for k_v in tags:
k, v = k_v.split('=')
tags_list.append(dict(k=v))
aws_account.kms_client.tag_resource(
KeyId=key_id,
Tags=tags_list,
)
for arn in administrators:
aws_account.kms_client.put_key_policy(
KeyId=key_id,
PolicyName='default',
Policy=ADMIN_KEY_POLICY.format(arn=arn)
)
# Note: get ARN from AWS
for arn in users:
aws_account.kms_client.put_key_policy(
KeyId=key_id,
Expand All @@ -103,20 +114,22 @@ def import_key(alias_name, description, tag_key, tag_value, administrators, user


@kms.command()
@ape_cli_context
@click.argument("alias_name")
@click.option("--days", help="Number of days until key is deactivated")
def schedule_delete_key(alias_name, days=30):
@click.option("-d", "--days", default=30, help="Number of days until key is deactivated")
def schedule_delete_key(alias_name, days):
if "alias" not in alias_name:
alias_name = f"alias/{alias_name}"
aws_accounts = AwsAccountContainer(data_folder='./', account_type=KmsAccount)
kms_account = None
for account in list(aws_accounts.accounts):
for account in aws_accounts.accounts:
if account.key_alias == alias_name:
kms_account = account

if not kms_account:
raise ValueError(f"No KMS Key with alias name {alias_name}")

kms_account.kms_client.schedule_delete_key(
kms_account.kms_client.delete_alias(AliasName=alias_name)
kms_account.kms_client.schedule_key_deletion(
KeyId=kms_account.key_id, PendingWindowInDays=days
)
return kms_account
click.echo(f"Key {kms_account.key_alias} scheduled for deletion")
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
"eth-utils>=2.3.1,<3",
"pydantic>=2.5.2,<3",
], # NOTE: Add 3rd party libraries here
entry_points={
"ape_cli_subcommands": ["ape_aws=ape_aws._cli:cli"]
},
python_requires=">=3.7,<4",
extras_require=extras_require,
py_modules=["ape_aws_kms"],
Expand Down

0 comments on commit 2d0fe60

Please sign in to comment.