diff --git a/README.md b/README.md index 00f7636..3565a1b 100644 --- a/README.md +++ b/README.md @@ -42,13 +42,37 @@ ape aws -h To create a new key: ```bash -ape aws kms create 'KeyAlias' 'Description of new key' +ape aws kms create KeyAlias -d 'Description of new key' ``` To delete this key: ```bash -ape aws kms delete 'KeyAlias' +ape aws kms delete KeyAlias +``` + +To import an existing private key into KMS: + +```bash +$ ape aws kms import KeyAlias +Enter your private key: +SUCCESS: Key imported successfully with ID: +``` + +You can also import a private key from a file (from hex or bytes): + +```bash +$ ape aws kms import KeyAlias --private-key +INFO: Reading private key from +SUCCESS: Key imported successfully with ID: +``` + +You can import using a mnemonic phrase as well: + +```bash +$ ape aws kms import KeyAlias --use-mnemonic +Enter your mnemonic phrase: +SUCCESS: Key imported successfully with ID: ``` ### IPython diff --git a/ape_aws/accounts.py b/ape_aws/accounts.py index 8c6ddde..59ba18e 100644 --- a/ape_aws/accounts.py +++ b/ape_aws/accounts.py @@ -14,9 +14,10 @@ class AwsAccountContainer(AccountContainerAPI): + @property def aliases(self) -> Iterator[str]: - return map(lambda x: x.alias, kms_client.raw_aliases) + return map(lambda x: x.alias.replace("alias/", ""), kms_client.raw_aliases) def __len__(self) -> int: return len(kms_client.raw_aliases) @@ -25,7 +26,7 @@ def __len__(self) -> int: def accounts(self) -> Iterator[AccountAPI]: return map( lambda x: KmsAccount( - key_alias=x.alias, + key_alias=x.alias.replace("alias/", ""), key_id=x.key_id, key_arn=x.arn, ), @@ -40,7 +41,7 @@ class KmsAccount(AccountAPI): @property def alias(self) -> str: - return self.key_alias.replace("alias/", "") + return self.key_alias @property def public_key(self): diff --git a/ape_aws/client.py b/ape_aws/client.py index d40e06b..89a271e 100644 --- a/ape_aws/client.py +++ b/ape_aws/client.py @@ -2,7 +2,11 @@ from typing import ClassVar import boto3 # type: ignore[import] -from pydantic import BaseModel, Field +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, padding +from eth_account import Account +from pydantic import BaseModel, ConfigDict, Field, field_validator class AliasResponse(BaseModel): @@ -15,10 +19,11 @@ class AliasResponse(BaseModel): class KeyBaseModel(BaseModel): alias: str + model_config = ConfigDict(populate_by_name=True) class CreateKeyModel(KeyBaseModel): - description: str = Field(alias="Description") + description: str | None = Field(default=None, alias="Description") policy: str | None = Field(default=None, alias="Policy") key_usage: str = Field(default="SIGN_VERIFY", alias="KeyUsage") key_spec: str = Field(default="ECC_SECG_P256K1", alias="KeySpec") @@ -67,10 +72,89 @@ class CreateKey(CreateKeyModel): origin: str = Field(default="AWS_KMS", alias="Origin") -class ImportKey(CreateKeyModel): +class ImportKeyRequest(CreateKeyModel): origin: str = Field(default="EXTERNAL", alias="Origin") +class ImportKey(ImportKeyRequest): + key_id: str = Field(default=None, alias="KeyId") + public_key: bytes = Field(default=None, alias="PublicKey") + private_key: str | bytes = Field(default=None, alias="PrivateKey") + import_token: bytes = Field(default=None, alias="ImportToken") + + @field_validator("private_key") + def validate_private_key(cls, value): + if value.startswith("0x"): + value = value[2:] + return value + + @property + def get_account(self): + return Account.privateKeyToAccount(self.private_key) + + @property + def ec_private_key(self): + loaded_key = self.private_key + if isinstance(loaded_key, bytes): + loaded_key = ec.derive_private_key(int(self.private_key, 16), ec.SECP256K1()) + elif isinstance(loaded_key, str): + loaded_key = bytes.fromhex(loaded_key[2:]) + loaded_key = ec.derive_private_key(int(self.private_key, 16), ec.SECP256K1()) + return loaded_key + + @property + def private_key_hex(self): + if isinstance(self.private_key, str): + return self.private_key + elif isinstance(self.private_key, bytes): + return self.private_key.hex() + return self.private_key.private_numbers().private_value.to_bytes(32, "big").hex() + + @property + def private_key_bin(self): + """ + Returns the private key in binary format + This is required for the `boto3.client.import_key_material` method + """ + return self.ec_private_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + @property + def private_key_pem(self): + """ + Returns the private key in PEM format for use in outside applications. + """ + return self.ec_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + + @property + def public_key_der(self): + return serialization.load_der_public_key( + self.public_key, + backend=default_backend(), + ) + + @property + def encrypted_private_key(self): + if not self.public_key: + raise ValueError("Public key not found") + + return self.public_key_der.encrypt( + self.private_key_bin, + padding.OAEP( + mgf=padding.MGF1(hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + class DeleteKey(KeyBaseModel): key_id: str days: int = 30 @@ -103,8 +187,9 @@ def sign(self, key_id, msghash): ) return response.get("Signature") - def create_key(self, key_spec: CreateKey): + def create_key(self, key_spec: CreateKey | ImportKeyRequest): response = self.client.create_key(**key_spec.to_aws_dict()) + key_id = response["KeyMetadata"]["KeyId"] self.client.create_alias( AliasName=f"alias/{key_spec.alias}", @@ -131,6 +216,21 @@ def create_key(self, key_spec: CreateKey): ) return key_id + def import_key(self, key_spec: ImportKey): + return self.client.import_key_material( + KeyId=key_spec.key_id, + ImportToken=key_spec.import_token, + EncryptedKeyMaterial=key_spec.encrypted_private_key, + ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE", + ) + + def get_parameters(self, key_id: str): + return self.client.get_parameters_for_import( + KeyId=key_id, + WrappingAlgorithm="RSAES_OAEP_SHA_256", + WrappingKeySpec="RSA_2048", + ) + def delete_key(self, key_spec: DeleteKey): self.client.delete_alias(AliasName=key_spec.alias) self.client.schedule_key_deletion(KeyId=key_spec.key_id, PendingWindowInDays=key_spec.days) diff --git a/ape_aws/kms/_cli.py b/ape_aws/kms/_cli.py index c6145e7..b3b761e 100644 --- a/ape_aws/kms/_cli.py +++ b/ape_aws/kms/_cli.py @@ -1,7 +1,11 @@ +from pathlib import Path + import click from ape.cli import ape_cli_context +from eth_account import Account as EthAccount +from eth_account.hdaccount import ETHEREUM_DEFAULT_PATH -from ape_aws.client import CreateKey, DeleteKey, kms_client +from ape_aws.client import CreateKey, DeleteKey, ImportKey, ImportKeyRequest, kms_client @click.group("kms") @@ -17,7 +21,6 @@ def kms(): "administrators", multiple=True, help="Apply key policy to a list of administrators if applicable, ex. -a ARN1, -a ARN2", - metavar="list[ARN]", ) @click.option( "-u", @@ -25,16 +28,20 @@ def kms(): "users", multiple=True, help="Apply key policy to a list of users if applicable, ex. -u ARN1, -u ARN2", - metavar="list[ARN]", +) +@click.option( + "-d", + "--description", + "description", + help="The description of the key you intend to create.", ) @click.argument("alias_name") -@click.argument("description") def create_key( cli_ctx, alias_name: str, - description: str, administrators: list[str], users: list[str], + description: str, ): """ Create an Ethereum Private Key in AWS KmsAccount @@ -54,7 +61,100 @@ def create_key( cli_ctx.logger.success(f"Key created successfully with ID: {key_id}") -# TODO: Add `ape aws kms import` +@kms.command(name="import") +@ape_cli_context() +@click.option( + "-p", + "--private-key", + "private_key_path", + type=click.Path(), + help="The private key you intend to import", +) +@click.option( + "-a", + "--admin", + "administrators", + multiple=True, + help="Apply key policy to a list of administrators if applicable, ex. -a ARN1, -a ARN2", +) +@click.option( + "-u", + "--user", + "users", + multiple=True, + help="Apply key policy to a list of users if applicable, ex. -u ARN1, -u ARN2", +) +@click.option( + "-d", + "--description", + "description", + help="The description of the key you intend to create.", +) +@click.option( + "--use-mnemonic", + "import_from_mnemonic", + help="Import a key from a mnemonic phrase", + is_flag=True, +) +@click.option( + "--hd-path", + "hd_path", + help="The hierarchical deterministic path to derive the key from", + default=ETHEREUM_DEFAULT_PATH, +) +@click.argument("alias_name") +def import_key( + cli_ctx, + alias_name: str, + private_key_path: Path, + administrators: list[str], + users: list[str], + description: str, + import_from_mnemonic: bool, + hd_path: str, +): + if private_key_path: + if isinstance(private_key_path, str): + private_key_path = Path(private_key_path) + if private_key_path.exists() and private_key_path.is_file(): + cli_ctx.logger.info(f"Reading private key from {private_key_path}") + private_key = private_key_path.read_text().strip() + + elif import_from_mnemonic: + mnemonic = click.prompt("Enter your mnemonic phrase", hide_input=True) + EthAccount.enable_unaudited_hdwallet_features() + account = EthAccount.from_mnemonic(mnemonic, account_path=hd_path) + private_key = account.key.hex() + + else: + private_key = click.prompt("Enter your private key", hide_input=True) + + key_spec = ImportKeyRequest( + alias=alias_name, + description=description, # type: ignore + admins=administrators, + users=users, + ) + key_id = kms_client.create_key(key_spec) + create_key_response = kms_client.get_parameters(key_id) + public_key = create_key_response["PublicKey"] + import_token = create_key_response["ImportToken"] + import_key_spec = ImportKey( + **key_spec.model_dump(), + key_id=key_id, # type: ignore + public_key=public_key, # type: ignore + private_key=private_key, # type: ignore + import_token=import_token, # type: ignore + ) + try: + response = kms_client.import_key(import_key_spec) + if response["ResponseMetadata"]["HTTPStatusCode"] != 200: + cli_ctx.abort(f"Key failed to import into KMS, {response['Error']}") + cli_ctx.logger.success(f"Key imported successfully with ID: {key_id}") + except Exception as e: + cli_ctx.logger.error(f"Key failed to import into KMS: {e}") + + # TODO: Add `ape aws kms sign-message [message]` # TODO: Add `ape aws kms verify-message [message] [hex-signature]` diff --git a/setup.py b/setup.py index 8950561..6a2a10b 100644 --- a/setup.py +++ b/setup.py @@ -57,6 +57,7 @@ "boto3>=1.34.79,<2", "eth-ape>=0.8.2,<0.9", "ecdsa>=0.19.0,<1", + "cryptography>=37.0.4,<38", ], # NOTE: Add 3rd party libraries here entry_points={"ape_cli_subcommands": ["ape_aws=ape_aws._cli:cli"]}, python_requires=">=3.7,<4",