diff --git a/.gitignore b/.gitignore index 4b5ae0b..4f1c24f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ scitt-signing-key.pem scitt-receipt.txt scitt/artifacts/_manifest/* my-signing-key.pem +receipt.cbor +signed-statement.cbor +transparent-statement.cbor diff --git a/scitt/dump_cbor.py b/scitt/dump_cbor.py new file mode 100755 index 0000000..6a7d16a --- /dev/null +++ b/scitt/dump_cbor.py @@ -0,0 +1,38 @@ +""" Module for dumping a CBOR file """ + +import argparse +from pprint import pprint +from pycose.messages import Sign1Message + + +def main(): + """Dumps content of a supposed CBOR file""" + + parser = argparse.ArgumentParser( + description="Dumps content of a supposed CBOR file" + ) + + # Signed Statement file + parser.add_argument( + "--input", + type=str, + help="filepath to the CBOR file.", + default="transparent-statement.cbor", + ) + + args = parser.parse_args() + + with open(args.input, "rb") as data_file: + data = data_file.read() + message = Sign1Message.decode(data) + print("\ncbor decoded cose sign1 statement:\n") + print("protected headers:") + pprint(message.phdr) + print("\nunprotected headers: ") + pprint(message.uhdr) + print("\npayload: ", message.payload) + print("payload hex: ", message.payload.hex()) + + +if __name__ == "__main__": + main() diff --git a/scitt/register_signed_statement.py b/scitt/register_signed_statement.py new file mode 100755 index 0000000..93523f3 --- /dev/null +++ b/scitt/register_signed_statement.py @@ -0,0 +1,247 @@ +""" Module for submitting a SCITT signed statement to the + DataTrails Transparency Service and optionally returning + a Transparent Statement """ + +import argparse +import logging +import os +import sys +from time import sleep as time_sleep + +from pycose.messages import Sign1Message +import requests + +# CWT header label comes from version 4 of the scitt architecture document +# https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity +HEADER_LABEL_CWT = 13 + +# Various CWT header labels come from: +# https://www.rfc-editor.org/rfc/rfc8392.html#section-3.1 +HEADER_LABEL_CWT_ISSUER = 1 +HEADER_LABEL_CWT_SUBJECT = 2 + +# CWT CNF header labels come from: +# https://datatracker.ietf.org/doc/html/rfc8747#name-confirmation-claim +HEADER_LABEL_CWT_CNF = 8 +HEADER_LABEL_CNF_COSE_KEY = 1 + +# all timeouts and durations are in seconds +REQUEST_TIMEOUT = 30 +POLL_TIMEOUT = 60 +POLL_INTERVAL = 10 + + +def get_dt_auth_header(logger: logging.Logger) -> str: + """ + Get DataTrails bearer token from OIDC credentials in env + """ + # Pick up credentials from env + client_id = os.environ.get("DATATRAILS_CLIENT_ID") + client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") + + if client_id is None or client_secret is None: + logger.error( + "Please configure your DataTrails credentials in the shell environment" + ) + sys.exit(1) + + # Get token from the auth endpoint + response = requests.post( + "https://app.datatrails.ai/archivist/iam/v1/appidp/token", + data={ + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + }, + timeout=REQUEST_TIMEOUT, + ) + if response.status_code != 200: + logger.error("FAILED to acquire bearer token") + logger.debug(response) + sys.exit(1) + + # Format as a request header + res = response.json() + return f'{res["token_type"]} {res["access_token"]}' + + +def submit_statement( + statement_file_path: str, headers: dict, logger: logging.Logger +) -> str: + """ + Given a Signed Statement CBOR file on disk, register it on the DataTrails + Transparency Service over the SCITT interface + """ + # Read the binary data from the file + with open(statement_file_path, "rb") as data_file: + data = data_file.read() + + # Make the POST request + response = requests.post( + "https://app.datatrails.ai/archivist/v1/publicscitt/entries", + headers=headers, + data=data, + timeout=REQUEST_TIMEOUT, + ) + if response.status_code != 200: + logger.error("FAILED to submit statement") + logger.debug(response) + sys.exit(1) + + # Make sure it's actually in process and wil work + res = response.json() + if not "operationID" in res: + logger.error("FAILED No OperationID locator in response") + logger.debug(res) + sys.exit(1) + + return res["operationID"] + + +def get_operation_status(operation_id: str, headers: dict) -> dict: + """ + Gets the status of a long-running registration operation + """ + response = requests.get( + f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}", + headers=headers, + timeout=REQUEST_TIMEOUT, + ) + + response.raise_for_status() + + return response.json() + + +def wait_for_entry_id(operation_id: str, headers: dict, logger: logging.Logger) -> str: + """ + Polls for the operation status to be 'succeeded'. + """ + + poll_attempts: int = int(POLL_TIMEOUT / POLL_INTERVAL) + + logger.info("starting to poll for operation status 'succeeded'") + + for _ in range(poll_attempts): + + try: + operation_status = get_operation_status(operation_id, headers) + + # pylint: disable=fixme + # TODO: ensure get_operation_status handles error cases from the rest request + if ( + "status" in operation_status + and operation_status["status"] == "succeeded" + ): + return operation_status["entryID"] + + except requests.HTTPError as e: + logger.debug("failed getting operation status, error: %s", e) + + time_sleep(POLL_INTERVAL) + + raise TimeoutError("signed statement not registered within polling duration") + + +def attach_receipt( + entry_id: str, + signed_statement_filepath: str, + transparent_statement_file_path: str, + headers: dict, + logger: logging.Logger, +): + """ + Given a Signed Statement and a corresponding Entry ID, fetch a Receipt from + the Transparency Service and write out a complete Transparent Statement + """ + # Get the receipt + response = requests.get( + f"https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt", + headers=headers, + timeout=REQUEST_TIMEOUT, + ) + if response.status_code != 200: + logger.error("FAILED to get receipt") + logger.debug(response) + sys.exit(1) + + logger.debug(response.content) + + # Open up the signed statement + with open(signed_statement_filepath, "rb") as data_file: + data = data_file.read() + message = Sign1Message.decode(data) + logger.debug(message) + + # Add receipt to the unprotected header and re-encode + message.uhdr["receipts"] = [response.content] + ts = message.encode(sign=False) + + # Write out the updated Transparent Statement + with open(transparent_statement_file_path, "wb") as file: + file.write(ts) + logger.info("File saved successfully") + + +def main(): + """Creates a Transparent Statement""" + + parser = argparse.ArgumentParser(description="Create a signed statement.") + + # Signed Statement file + parser.add_argument( + "--signed-statement-file", + type=str, + help="filepath to the Signed Statement to be registered.", + default="signed-statement.cbor", + ) + + # Output file + parser.add_argument( + "--output-file", + type=str, + help="output file to store the Transparent Statement (leave blank to skip saving).", + default="", + ) + + # log level + parser.add_argument( + "--log-level", + type=str, + help="log level. for any individual poll errors use DEBUG, defaults to WARNING", + default="WARNING", + ) + + args = parser.parse_args() + + logger = logging.getLogger("check operation status") + logging.basicConfig(level=logging.getLevelName(args.log_level)) + + # Get auth + auth_headers = {"Authorization": get_dt_auth_header(logger)} + + # Submit Signed Statement to DataTrails + op_id = submit_statement(args.signed_statement_file, auth_headers, logger) + logging.info("Successfully submitted with Operation ID %s", op_id) + + # If the client wants the Transparent Statement, wait for it + if args.output_file != "": + logging.info("Now waiting for registration to complete") + + # Wait for the registration to complete + try: + entry_id = wait_for_entry_id(op_id, auth_headers, logger) + except TimeoutError as e: + logger.error(e) + sys.exit(1) + + logger.info("Fully Registered with Entry ID %s", entry_id) + + # Attach the receipt + attach_receipt( + entry_id, args.signed_statement_file, args.output_file, auth_headers, logger + ) + + +if __name__ == "__main__": + main() diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index a578c4a..7b6e48d 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -19,14 +19,23 @@ HEADER_LABEL_DID = 391 -def open_receipt(receipt_file: str) -> bytes: +def read_cbor_file(cbor_file: str) -> Sign1Message: """ opens the receipt from the receipt file. NOTE: the receipt is expected to be in cbor encoding. """ - with open(receipt_file, "rb") as file: - receipt = file.read() - return receipt + with open(cbor_file, "rb") as file: + contents = file.read() + + # decode the cbor encoded cose sign1 message + try: + cose_object = Sign1Message.decode(contents) + except (ValueError, AttributeError): + # This is fatal + print("failed to decode cose sign1 from file", file=sys.stderr) + sys.exit(1) + + return cose_object def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: @@ -92,53 +101,79 @@ def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: raise ValueError(f"no key with kid: {kid} in verification methods of did document") -def verify_receipt(receipt: bytes) -> bool: +def verify_receipt(receipt: Sign1Message) -> bool: """ verifies the counter signed receipt signature """ - # decode the cbor encoded cose sign1 message - try: - message = Sign1Message.decode(receipt) - except (ValueError, AttributeError): - print("failed to decode cose sign1 receipt", file=sys.stderr) - return False - # get the verification key from didweb - kid: bytes = message.phdr[KID] - didurl = message.phdr[HEADER_LABEL_DID] + kid: bytes = receipt.phdr[KID] + didurl = receipt.phdr[HEADER_LABEL_DID] cose_key_dict = get_didweb_pubkey(didurl, kid) cose_key = CoseKey.from_dict(cose_key_dict) - message.key = cose_key + receipt.key = cose_key # verify the counter signed receipt signature - verified = message.verify_signature() + verified = receipt.verify_signature() # type: ignore return verified +def verify_transparent_statement(transparent_statement: Sign1Message) -> bool: + """ + verifies the counter signed receipt signature in a TS + """ + + # Pull the receipt out of the structure + try: + receipt_bytes = transparent_statement.uhdr["receipts"][0] + except (ValueError, AttributeError, KeyError): + print("failed to extract receipt from Transparent Statement", file=sys.stderr) + return False + + # Re-constitute it as a COSE object + try: + receipt = Sign1Message.decode(receipt_bytes) + except (ValueError, AttributeError): + print("failed to extract receipt from Transparent Statement", file=sys.stderr) + return False + + # Verify it + return verify_receipt(receipt) + + def main(): """Verifies a counter signed receipt signature""" parser = argparse.ArgumentParser( - description="Verify a counter signed receipt signature." + description="Verify countersigned signature from a Receipt or Transparent Statement." ) - # signing key file - parser.add_argument( + options = parser.add_argument_group("Input File Type") + options.add_argument( "--receipt-file", type=str, - help="filepath to the stored receipt, in cbor format.", - default="scitt-receipt.cbor", + help="filepath to a stored Receipt, in CBOR format.", + ) + options.add_argument( + "--transparent-statement-file", + type=str, + help="filepath to a stored Transparent Statement, in CBOR format.", + default="transparent-statement.cbor", ) args = parser.parse_args() - receipt = open_receipt(args.receipt_file) - - verified = verify_receipt(receipt) + if args.receipt_file: + receipt = read_cbor_file(args.receipt_file) + verified = verify_receipt(receipt) + else: + # Note this logic works because only the transparent statement arg + # has a default. Don't change that without changing this! + transparent_statement = read_cbor_file(args.transparent_statement_file) + verified = verify_transparent_statement(transparent_statement) if verified: print("signature verification succeeded") diff --git a/unittests/test_verify_receipt_signature.py b/unittests/test_verify_receipt_signature.py index 5f773d9..29d93a7 100644 --- a/unittests/test_verify_receipt_signature.py +++ b/unittests/test_verify_receipt_signature.py @@ -4,7 +4,7 @@ import unittest -from scitt.verify_receipt_signature import verify_receipt, open_receipt +from scitt.verify_receipt_signature import verify_receipt, read_cbor_file from .constants import KNOWN_RECEIPT_FILE @@ -19,7 +19,7 @@ def test_verify_kat_receipt(self): """ tests we can verify the signature of a known receipt. """ - receipt = open_receipt(KNOWN_RECEIPT_FILE) + receipt = read_cbor_file(KNOWN_RECEIPT_FILE) verified = verify_receipt(receipt)