From ae5e3796bda59e0877bf250faf823bfa98e77ffc Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sat, 20 Jul 2024 15:14:09 -0700 Subject: [PATCH 01/12] Add all-in-one script to register, get receipt, and make merged Transparent Statement --- .gitignore | 2 + scitt/dump_cbor.py | 30 ++++ scitt/register_signed_statement.py | 254 +++++++++++++++++++++++++++++ 3 files changed, 286 insertions(+) create mode 100755 scitt/dump_cbor.py create mode 100755 scitt/register_signed_statement.py diff --git a/.gitignore b/.gitignore index 4b5ae0b..1533655 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ scitt-signing-key.pem scitt-receipt.txt scitt/artifacts/_manifest/* my-signing-key.pem +signed-statement.cbor +transparent-statement.cbor diff --git a/scitt/dump_cbor.py b/scitt/dump_cbor.py new file mode 100755 index 0000000..18d94ef --- /dev/null +++ b/scitt/dump_cbor.py @@ -0,0 +1,30 @@ +""" Module for dumping a CBOR file """ + +import argparse + +from pycose.messages import Sign1Message + +def main(): + """Dumps content of a supposed CBOR file""" + + parser = argparse.ArgumentParser(description="Dumps content of a supposed CBOR file") + + # Signed Statement file + parser.add_argument( + "--input", + type=str, + help="filepath to the CBOR file.", + default="transparent-statement.cbor", + ) + + args = parser.parse_args() + + with open(args.input, 'rb') as data_file: + data = data_file.read() + message = Sign1Message.decode(data) + print(message) + print(f'Protected Header: {message.phdr}') + print(f'Unprotected Header: {message.uhdr}') + +if __name__ == "__main__": + main() diff --git a/scitt/register_signed_statement.py b/scitt/register_signed_statement.py new file mode 100755 index 0000000..00db447 --- /dev/null +++ b/scitt/register_signed_statement.py @@ -0,0 +1,254 @@ +""" Module for submitting a SCITT signed statement to the + DataTrails Transparency Service and optionally returning + a Transparent Statement """ + +import hashlib +import json +import argparse +import os +from io import BytesIO +import requests +import logging +from time import sleep as time_sleep + +from typing import Optional + +from pycose.messages import Sign1Message +from pycose.headers import Algorithm, KID, ContentType +from pycose.algorithms import Es256 +from pycose.keys.curves import P256 +from pycose.keys.keyparam import KpKty, EC2KpD, EC2KpX, EC2KpY, KpKeyOps, EC2KpCurve +from pycose.keys.keytype import KtyEC2 +from pycose.keys.keyops import SignOp, VerifyOp +from pycose.keys import CoseKey + +from ecdsa import SigningKey, VerifyingKey + +# CWT header label comes from version 4 of the scitt architecture document +# https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity +HEADER_LABEL_CWT = 13 + +# Various CWT header labels come from: +# https://www.rfc-editor.org/rfc/rfc8392.html#section-3.1 +HEADER_LABEL_CWT_ISSUER = 1 +HEADER_LABEL_CWT_SUBJECT = 2 + +# CWT CNF header labels come from: +# https://datatracker.ietf.org/doc/html/rfc8747#name-confirmation-claim +HEADER_LABEL_CWT_CNF = 8 +HEADER_LABEL_CNF_COSE_KEY = 1 + +# all timeouts and durations are in seconds +REQUEST_TIMEOUT = 30 +POLL_TIMEOUT = 60 +POLL_INTERVAL = 10 + +def get_dt_auth_header( + logger: logging.Logger +) -> str: + # Pick up credentials from env + client_id = os.environ.get("DATATRAILS_CLIENT_ID") + client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") + + if client_id is None or client_secret is None: + logger.error("Please configure your DataTrails credentials in the shell environment") + exit(1) + + # Get token from the auth endpoint + response = requests.post( + "https://app.datatrails.ai/archivist/iam/v1/appidp/token", + data={ + 'grant_type': 'client_credentials', + 'client_id': client_id, + 'client_secret': client_secret + } + ) + if response.status_code != 200: + logger.error("FAILED to acquire bearer token") + logger.debug(response) + exit(1) + + # Format as a request header + res = response.json() + return f'{res["token_type"]} {res["access_token"]}' + +def submit_statement( + statement_file_path: str, + headers: dict, + logger: logging.Logger +) -> str: + # Read the binary data from the file + with open(statement_file_path, 'rb') as data_file: + data = data_file.read() + + # Make the POST request + response = requests.post( + 'https://app.datatrails.ai/archivist/v1/publicscitt/entries', + headers=headers, + data=data + ) + if response.status_code != 200: + logger.error("FAILED to submit statement") + logger.debug(response) + exit(1) + + # Make sure it's actually in process and wil work + res = response.json() + if not "operationID" in res: + logger.error("FAILED No OperationID locator in response") + logger.debug(res) + exit(1) + + return res["operationID"] + +def get_operation_status( + operation_id: str, + headers: dict +) -> dict: + """ + gets the operation status from the datatrails API for retrieving operation status + """ + + url = ( + f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}" + ) + + response = requests.get(url, timeout=30, headers=headers) + response.raise_for_status() + + return response.json() + +def wait_for_entry_id( + operation_id: str, + headers: dict, + logger: logging.Logger +) -> str: + """ + polls for the operation status to be 'succeeded'. + """ + + poll_attempts: int = int(POLL_TIMEOUT / POLL_INTERVAL) + + logger.info("starting to poll for operation status 'succeeded'") + + for _ in range(poll_attempts): + + try: + operation_status = get_operation_status(operation_id, headers) + + # pylint: disable=fixme + # TODO: ensure get_operation_status handles error cases from the rest request + if ( + "status" in operation_status + and operation_status["status"] == "succeeded" + ): + return operation_status["entryID"] + + except requests.HTTPError as e: + logger.debug("failed getting operation status, error: %s", e) + + time_sleep(POLL_INTERVAL) + + raise TimeoutError("signed statement not registered within polling duration") + +def attach_receipt( + entry_id: str, + signed_statement_filepath: str, + transparent_statement_file_path: str, + headers: dict, + logger: logging.Logger +): + # Get the receipt + response = requests.get( + f'https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt', + headers=headers + ) + if response.status_code != 200: + logger.error("FAILED to get receipt") + logger.debug(response) + exit(1) + + logger.debug(response.content) + + # Open up the signed statement + with open(signed_statement_filepath, 'rb') as data_file: + data = data_file.read() + message = Sign1Message.decode(data) + logger.debug(message) + + # Add receipt to the unprotected header and re-encode + message.uhdr['receipts'] = [response.content] + ts = Sign1Message.encode(message, sign=False) + + # Write out the updated Transparent Statement + with open(transparent_statement_file_path, 'wb') as file: + file.write(ts) + logger.info("File saved successfully") + +def main(): + """Creates a signed statement""" + + parser = argparse.ArgumentParser(description="Create a signed statement.") + + # Signed Statement file + parser.add_argument( + "--signed-statement-file", + type=str, + help="filepath to the Signed Statement to be registered.", + default="signed-statement.cbor", + ) + + # Output file + parser.add_argument( + "--output-file", + type=str, + help="output file to store the Transparent Statement (leave blank to skip saving).", + default="", + ) + + # log level + parser.add_argument( + "--log-level", + type=str, + help="log level. for any individual poll errors use DEBUG, defaults to WARNING", + default="WARNING", + ) + + args = parser.parse_args() + + logger = logging.getLogger("check operation status") + logging.basicConfig(level=logging.getLevelName(args.log_level)) + + # Get auth + auth_headers = { + 'Authorization': get_dt_auth_header(logger) + } + + # Submit Signed Statement to DataTrails + op_id = submit_statement(args.signed_statement_file, auth_headers, logger) + logging.info(f'Successfully submitted with Operation ID {op_id}') + + # If the client wants the Transparent Statement, wait for it + if args.output_file != "" : + logging.info(f'Now waiting for registration to complete') + + # Wait for the registration to complete + try: + entry_id = wait_for_entry_id(op_id, auth_headers, logger) + except TimeoutError as e: + logger.error(e) + exit(1) + + logger.info(f'Fully Registered with Entry ID {entry_id}') + + # Attach the receipt + attach_receipt( + entry_id, + args.signed_statement_file, + args.output_file, + auth_headers, + logger + ) + +if __name__ == "__main__": + main() From ae9767283c78c541f237bb0a8d9719a8bcbfbe2d Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sat, 20 Jul 2024 15:25:01 -0700 Subject: [PATCH 02/12] Nicer formatting in dump_cbor --- scitt/dump_cbor.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/scitt/dump_cbor.py b/scitt/dump_cbor.py index 18d94ef..990987a 100755 --- a/scitt/dump_cbor.py +++ b/scitt/dump_cbor.py @@ -1,7 +1,7 @@ """ Module for dumping a CBOR file """ import argparse - +from pprint import pprint from pycose.messages import Sign1Message def main(): @@ -22,9 +22,13 @@ def main(): with open(args.input, 'rb') as data_file: data = data_file.read() message = Sign1Message.decode(data) - print(message) - print(f'Protected Header: {message.phdr}') - print(f'Unprotected Header: {message.uhdr}') + print("\ncbor decoded cose sign1 statement:\n") + print("protected headers:") + pprint(message.phdr) + print("\nunprotected headers: ") + pprint(message.uhdr) + print("\npayload: ", message.payload) + print("payload hex: ", message.payload.hex()) if __name__ == "__main__": main() From 69a11136950129323e1a0c8eac204e6fd7dfbc22 Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sat, 20 Jul 2024 15:43:45 -0700 Subject: [PATCH 03/12] Linting --- scitt/dump_cbor.py | 2 ++ scitt/register_signed_statement.py | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/scitt/dump_cbor.py b/scitt/dump_cbor.py index 990987a..a111643 100755 --- a/scitt/dump_cbor.py +++ b/scitt/dump_cbor.py @@ -4,6 +4,7 @@ from pprint import pprint from pycose.messages import Sign1Message + def main(): """Dumps content of a supposed CBOR file""" @@ -30,5 +31,6 @@ def main(): print("\npayload: ", message.payload) print("payload hex: ", message.payload.hex()) + if __name__ == "__main__": main() diff --git a/scitt/register_signed_statement.py b/scitt/register_signed_statement.py index 00db447..696fc67 100755 --- a/scitt/register_signed_statement.py +++ b/scitt/register_signed_statement.py @@ -43,6 +43,7 @@ POLL_TIMEOUT = 60 POLL_INTERVAL = 10 + def get_dt_auth_header( logger: logging.Logger ) -> str: @@ -72,6 +73,7 @@ def get_dt_auth_header( res = response.json() return f'{res["token_type"]} {res["access_token"]}' + def submit_statement( statement_file_path: str, headers: dict, @@ -101,6 +103,7 @@ def submit_statement( return res["operationID"] + def get_operation_status( operation_id: str, headers: dict @@ -118,6 +121,7 @@ def get_operation_status( return response.json() + def wait_for_entry_id( operation_id: str, headers: dict, @@ -151,6 +155,7 @@ def wait_for_entry_id( raise TimeoutError("signed statement not registered within polling duration") + def attach_receipt( entry_id: str, signed_statement_filepath: str, @@ -167,7 +172,7 @@ def attach_receipt( logger.error("FAILED to get receipt") logger.debug(response) exit(1) - + logger.debug(response.content) # Open up the signed statement @@ -180,11 +185,12 @@ def attach_receipt( message.uhdr['receipts'] = [response.content] ts = Sign1Message.encode(message, sign=False) - # Write out the updated Transparent Statement + # Write out the updated Transparent Statement with open(transparent_statement_file_path, 'wb') as file: file.write(ts) logger.info("File saved successfully") + def main(): """Creates a signed statement""" @@ -250,5 +256,6 @@ def main(): logger ) + if __name__ == "__main__": main() From a59281448372b6388c313a3127ebaf98f526ba0c Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sat, 20 Jul 2024 16:07:42 -0700 Subject: [PATCH 04/12] More linting --- scitt/register_signed_statement.py | 72 +++++++++++++++--------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/scitt/register_signed_statement.py b/scitt/register_signed_statement.py index 696fc67..a67e98c 100755 --- a/scitt/register_signed_statement.py +++ b/scitt/register_signed_statement.py @@ -2,27 +2,14 @@ DataTrails Transparency Service and optionally returning a Transparent Statement """ -import hashlib -import json import argparse -import os -from io import BytesIO -import requests import logging +import os +import sys from time import sleep as time_sleep -from typing import Optional - from pycose.messages import Sign1Message -from pycose.headers import Algorithm, KID, ContentType -from pycose.algorithms import Es256 -from pycose.keys.curves import P256 -from pycose.keys.keyparam import KpKty, EC2KpD, EC2KpX, EC2KpY, KpKeyOps, EC2KpCurve -from pycose.keys.keytype import KtyEC2 -from pycose.keys.keyops import SignOp, VerifyOp -from pycose.keys import CoseKey - -from ecdsa import SigningKey, VerifyingKey +import requests # CWT header label comes from version 4 of the scitt architecture document # https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity @@ -47,13 +34,16 @@ def get_dt_auth_header( logger: logging.Logger ) -> str: + """ + Get DataTrails bearer token from OIDC credentials in env + """ # Pick up credentials from env client_id = os.environ.get("DATATRAILS_CLIENT_ID") client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") if client_id is None or client_secret is None: logger.error("Please configure your DataTrails credentials in the shell environment") - exit(1) + sys.exit(1) # Get token from the auth endpoint response = requests.post( @@ -62,12 +52,13 @@ def get_dt_auth_header( 'grant_type': 'client_credentials', 'client_id': client_id, 'client_secret': client_secret - } + }, + timeout=REQUEST_TIMEOUT ) if response.status_code != 200: logger.error("FAILED to acquire bearer token") logger.debug(response) - exit(1) + sys.exit(1) # Format as a request header res = response.json() @@ -79,6 +70,10 @@ def submit_statement( headers: dict, logger: logging.Logger ) -> str: + """ + Given a Signed Statement CBOR file on disk, register it on the DataTrails + Transparency Service over the SCITT interface + """ # Read the binary data from the file with open(statement_file_path, 'rb') as data_file: data = data_file.read() @@ -87,19 +82,20 @@ def submit_statement( response = requests.post( 'https://app.datatrails.ai/archivist/v1/publicscitt/entries', headers=headers, - data=data + data=data, + timeout=REQUEST_TIMEOUT ) if response.status_code != 200: logger.error("FAILED to submit statement") logger.debug(response) - exit(1) + sys.exit(1) # Make sure it's actually in process and wil work res = response.json() if not "operationID" in res: logger.error("FAILED No OperationID locator in response") logger.debug(res) - exit(1) + sys.exit(1) return res["operationID"] @@ -109,14 +105,13 @@ def get_operation_status( headers: dict ) -> dict: """ - gets the operation status from the datatrails API for retrieving operation status + Gets the status of a long-running registration operation """ + response = requests.get( + f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}", + headers=headers, + timeout=REQUEST_TIMEOUT) - url = ( - f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}" - ) - - response = requests.get(url, timeout=30, headers=headers) response.raise_for_status() return response.json() @@ -128,7 +123,7 @@ def wait_for_entry_id( logger: logging.Logger ) -> str: """ - polls for the operation status to be 'succeeded'. + Polls for the operation status to be 'succeeded'. """ poll_attempts: int = int(POLL_TIMEOUT / POLL_INTERVAL) @@ -163,15 +158,20 @@ def attach_receipt( headers: dict, logger: logging.Logger ): + """ + Given a Signed Statement and a corresponding Entry ID, fetch a Receipt from + the Transparency Service and write out a complete Transparent Statement + """ # Get the receipt response = requests.get( f'https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt', - headers=headers + headers=headers, + timeout=REQUEST_TIMEOUT ) if response.status_code != 200: logger.error("FAILED to get receipt") logger.debug(response) - exit(1) + sys.exit(1) logger.debug(response.content) @@ -192,7 +192,7 @@ def attach_receipt( def main(): - """Creates a signed statement""" + """Creates a Transparent Statement""" parser = argparse.ArgumentParser(description="Create a signed statement.") @@ -232,20 +232,20 @@ def main(): # Submit Signed Statement to DataTrails op_id = submit_statement(args.signed_statement_file, auth_headers, logger) - logging.info(f'Successfully submitted with Operation ID {op_id}') + logging.info('Successfully submitted with Operation ID %s', op_id) # If the client wants the Transparent Statement, wait for it if args.output_file != "" : - logging.info(f'Now waiting for registration to complete') + logging.info('Now waiting for registration to complete') # Wait for the registration to complete try: entry_id = wait_for_entry_id(op_id, auth_headers, logger) except TimeoutError as e: logger.error(e) - exit(1) + sys.exit(1) - logger.info(f'Fully Registered with Entry ID {entry_id}') + logger.info('Fully Registered with Entry ID %s', entry_id) # Attach the receipt attach_receipt( From 7973e68f9280daece821d252077d4ec68b3c558d Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sat, 20 Jul 2024 16:12:05 -0700 Subject: [PATCH 05/12] More more linting --- scitt/dump_cbor.py | 6 ++- scitt/register_signed_statement.py | 70 ++++++++++++------------------ 2 files changed, 32 insertions(+), 44 deletions(-) diff --git a/scitt/dump_cbor.py b/scitt/dump_cbor.py index a111643..6a7d16a 100755 --- a/scitt/dump_cbor.py +++ b/scitt/dump_cbor.py @@ -8,7 +8,9 @@ def main(): """Dumps content of a supposed CBOR file""" - parser = argparse.ArgumentParser(description="Dumps content of a supposed CBOR file") + parser = argparse.ArgumentParser( + description="Dumps content of a supposed CBOR file" + ) # Signed Statement file parser.add_argument( @@ -20,7 +22,7 @@ def main(): args = parser.parse_args() - with open(args.input, 'rb') as data_file: + with open(args.input, "rb") as data_file: data = data_file.read() message = Sign1Message.decode(data) print("\ncbor decoded cose sign1 statement:\n") diff --git a/scitt/register_signed_statement.py b/scitt/register_signed_statement.py index a67e98c..8002677 100755 --- a/scitt/register_signed_statement.py +++ b/scitt/register_signed_statement.py @@ -31,9 +31,7 @@ POLL_INTERVAL = 10 -def get_dt_auth_header( - logger: logging.Logger -) -> str: +def get_dt_auth_header(logger: logging.Logger) -> str: """ Get DataTrails bearer token from OIDC credentials in env """ @@ -42,18 +40,20 @@ def get_dt_auth_header( client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") if client_id is None or client_secret is None: - logger.error("Please configure your DataTrails credentials in the shell environment") + logger.error( + "Please configure your DataTrails credentials in the shell environment" + ) sys.exit(1) # Get token from the auth endpoint response = requests.post( "https://app.datatrails.ai/archivist/iam/v1/appidp/token", data={ - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, }, - timeout=REQUEST_TIMEOUT + timeout=REQUEST_TIMEOUT, ) if response.status_code != 200: logger.error("FAILED to acquire bearer token") @@ -66,24 +66,22 @@ def get_dt_auth_header( def submit_statement( - statement_file_path: str, - headers: dict, - logger: logging.Logger + statement_file_path: str, headers: dict, logger: logging.Logger ) -> str: """ Given a Signed Statement CBOR file on disk, register it on the DataTrails Transparency Service over the SCITT interface """ # Read the binary data from the file - with open(statement_file_path, 'rb') as data_file: + with open(statement_file_path, "rb") as data_file: data = data_file.read() # Make the POST request response = requests.post( - 'https://app.datatrails.ai/archivist/v1/publicscitt/entries', + "https://app.datatrails.ai/archivist/v1/publicscitt/entries", headers=headers, data=data, - timeout=REQUEST_TIMEOUT + timeout=REQUEST_TIMEOUT, ) if response.status_code != 200: logger.error("FAILED to submit statement") @@ -100,28 +98,22 @@ def submit_statement( return res["operationID"] -def get_operation_status( - operation_id: str, - headers: dict -) -> dict: +def get_operation_status(operation_id: str, headers: dict) -> dict: """ Gets the status of a long-running registration operation """ response = requests.get( f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}", headers=headers, - timeout=REQUEST_TIMEOUT) + timeout=REQUEST_TIMEOUT, + ) response.raise_for_status() return response.json() -def wait_for_entry_id( - operation_id: str, - headers: dict, - logger: logging.Logger -) -> str: +def wait_for_entry_id(operation_id: str, headers: dict, logger: logging.Logger) -> str: """ Polls for the operation status to be 'succeeded'. """ @@ -156,7 +148,7 @@ def attach_receipt( signed_statement_filepath: str, transparent_statement_file_path: str, headers: dict, - logger: logging.Logger + logger: logging.Logger, ): """ Given a Signed Statement and a corresponding Entry ID, fetch a Receipt from @@ -164,9 +156,9 @@ def attach_receipt( """ # Get the receipt response = requests.get( - f'https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt', + f"https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt", headers=headers, - timeout=REQUEST_TIMEOUT + timeout=REQUEST_TIMEOUT, ) if response.status_code != 200: logger.error("FAILED to get receipt") @@ -176,17 +168,17 @@ def attach_receipt( logger.debug(response.content) # Open up the signed statement - with open(signed_statement_filepath, 'rb') as data_file: + with open(signed_statement_filepath, "rb") as data_file: data = data_file.read() message = Sign1Message.decode(data) logger.debug(message) # Add receipt to the unprotected header and re-encode - message.uhdr['receipts'] = [response.content] + message.uhdr["receipts"] = [response.content] ts = Sign1Message.encode(message, sign=False) # Write out the updated Transparent Statement - with open(transparent_statement_file_path, 'wb') as file: + with open(transparent_statement_file_path, "wb") as file: file.write(ts) logger.info("File saved successfully") @@ -226,17 +218,15 @@ def main(): logging.basicConfig(level=logging.getLevelName(args.log_level)) # Get auth - auth_headers = { - 'Authorization': get_dt_auth_header(logger) - } + auth_headers = {"Authorization": get_dt_auth_header(logger)} # Submit Signed Statement to DataTrails op_id = submit_statement(args.signed_statement_file, auth_headers, logger) - logging.info('Successfully submitted with Operation ID %s', op_id) + logging.info("Successfully submitted with Operation ID %s", op_id) # If the client wants the Transparent Statement, wait for it - if args.output_file != "" : - logging.info('Now waiting for registration to complete') + if args.output_file != "": + logging.info("Now waiting for registration to complete") # Wait for the registration to complete try: @@ -245,15 +235,11 @@ def main(): logger.error(e) sys.exit(1) - logger.info('Fully Registered with Entry ID %s', entry_id) + logger.info("Fully Registered with Entry ID %s", entry_id) # Attach the receipt attach_receipt( - entry_id, - args.signed_statement_file, - args.output_file, - auth_headers, - logger + entry_id, args.signed_statement_file, args.output_file, auth_headers, logger ) From 3b1df2f3c56d9a7d9a561a3bbf89141706c4beb2 Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sat, 20 Jul 2024 16:24:13 -0700 Subject: [PATCH 06/12] More more more linting --- scitt/register_signed_statement.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scitt/register_signed_statement.py b/scitt/register_signed_statement.py index 8002677..93523f3 100755 --- a/scitt/register_signed_statement.py +++ b/scitt/register_signed_statement.py @@ -175,7 +175,7 @@ def attach_receipt( # Add receipt to the unprotected header and re-encode message.uhdr["receipts"] = [response.content] - ts = Sign1Message.encode(message, sign=False) + ts = message.encode(sign=False) # Write out the updated Transparent Statement with open(transparent_statement_file_path, "wb") as file: From 6d3087bf9861414f28d08a0b9ee306390e4ee88c Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sun, 21 Jul 2024 15:07:56 -0700 Subject: [PATCH 07/12] Upgrade verify script to support Transprent Statement as well as Receipt --- scitt/verify_receipt_signature.py | 82 ++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index a578c4a..e28bde0 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -19,14 +19,22 @@ HEADER_LABEL_DID = 391 -def open_receipt(receipt_file: str) -> bytes: +def read_cbor_file(cbor_file: str) -> bytes | None: """ opens the receipt from the receipt file. NOTE: the receipt is expected to be in cbor encoding. """ - with open(receipt_file, "rb") as file: + with open(cbor_file, "rb") as file: receipt = file.read() - return receipt + + # decode the cbor encoded cose sign1 message + try: + message = Sign1Message.decode(receipt) + except (ValueError, AttributeError): + print("failed to decode cose sign1 from file", file=sys.stderr) + return None + + return message def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: @@ -92,53 +100,81 @@ def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: raise ValueError(f"no key with kid: {kid} in verification methods of did document") -def verify_receipt(receipt: bytes) -> bool: +def verify_receipt(receipt: Sign1Message) -> bool: """ verifies the counter signed receipt signature """ - # decode the cbor encoded cose sign1 message - try: - message = Sign1Message.decode(receipt) - except (ValueError, AttributeError): - print("failed to decode cose sign1 receipt", file=sys.stderr) - return False - # get the verification key from didweb - kid: bytes = message.phdr[KID] - didurl = message.phdr[HEADER_LABEL_DID] + kid: bytes = receipt.phdr[KID] + didurl = receipt.phdr[HEADER_LABEL_DID] cose_key_dict = get_didweb_pubkey(didurl, kid) cose_key = CoseKey.from_dict(cose_key_dict) - message.key = cose_key + receipt.key = cose_key # verify the counter signed receipt signature - verified = message.verify_signature() + verified = receipt.verify_signature() return verified +def verify_transparent_statement(transparent_statement: Sign1Message) -> bool: + """ + verifies the counter signed receipt signature in a TS + """ + + # Pull the receipt out of the structure + try: + receipt_bytes = transparent_statement.uhdr["receipts"][0] + except (ValueError, AttributeError, KeyError): + print("failed to extract receipt from Transparent Statement", file=sys.stderr) + return False + + # Re-constitute it as a COSE object + try: + print(receipt_bytes) + receipt = Sign1Message.decode(receipt_bytes) + except (ValueError, AttributeError): + print("failed to extract receipt from Transparent Statement", file=sys.stderr) + return False + + # Verify it + print(receipt) + return verify_receipt(receipt) + + def main(): """Verifies a counter signed receipt signature""" parser = argparse.ArgumentParser( - description="Verify a counter signed receipt signature." + description="Verify a counter signed receipt signature from a Receipt or Transparent Statement." ) - # signing key file - parser.add_argument( + options = parser.add_argument_group("Input File Type") + options.add_argument( "--receipt-file", type=str, - help="filepath to the stored receipt, in cbor format.", - default="scitt-receipt.cbor", + help="filepath to a stored Receipt, in CBOR format.", + ) + options.add_argument( + "--transparent-statement-file", + type=str, + help="filepath to a stored Transparent Statement, in CBOR format.", + default="transparent-statement.cbor", ) args = parser.parse_args() - receipt = open_receipt(args.receipt_file) - - verified = verify_receipt(receipt) + if args.receipt_file: + receipt = read_cbor_file(args.receipt_file) + verified = verify_receipt(receipt) + else: + # Note this logic works because only the transparent statement arg + # has a default. Don't change that without changing this! + transparent_statement = read_cbor_file(args.transparent_statement_file) + verified = verify_transparent_statement(transparent_statement) if verified: print("signature verification succeeded") From c65b076df36b7b20e9d2112f70320869cd53b745 Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sun, 21 Jul 2024 15:16:36 -0700 Subject: [PATCH 08/12] Unit tests --- scitt/verify_receipt_signature.py | 4 ++-- unittests/test_verify_receipt_signature.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index e28bde0..bbdd473 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -19,7 +19,7 @@ HEADER_LABEL_DID = 391 -def read_cbor_file(cbor_file: str) -> bytes | None: +def read_cbor_file(cbor_file: str) -> Sign1Message: """ opens the receipt from the receipt file. NOTE: the receipt is expected to be in cbor encoding. @@ -149,7 +149,7 @@ def main(): """Verifies a counter signed receipt signature""" parser = argparse.ArgumentParser( - description="Verify a counter signed receipt signature from a Receipt or Transparent Statement." + description="Verify countersigned signature from a Receipt or Transparent Statement." ) options = parser.add_argument_group("Input File Type") diff --git a/unittests/test_verify_receipt_signature.py b/unittests/test_verify_receipt_signature.py index 5f773d9..29d93a7 100644 --- a/unittests/test_verify_receipt_signature.py +++ b/unittests/test_verify_receipt_signature.py @@ -4,7 +4,7 @@ import unittest -from scitt.verify_receipt_signature import verify_receipt, open_receipt +from scitt.verify_receipt_signature import verify_receipt, read_cbor_file from .constants import KNOWN_RECEIPT_FILE @@ -19,7 +19,7 @@ def test_verify_kat_receipt(self): """ tests we can verify the signature of a known receipt. """ - receipt = open_receipt(KNOWN_RECEIPT_FILE) + receipt = read_cbor_file(KNOWN_RECEIPT_FILE) verified = verify_receipt(receipt) From 35dbbb745f1331c1fbea8f72c392d9990577562b Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sun, 21 Jul 2024 15:22:58 -0700 Subject: [PATCH 09/12] Tidy up types --- scitt/verify_receipt_signature.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index bbdd473..b4798e8 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -31,8 +31,9 @@ def read_cbor_file(cbor_file: str) -> Sign1Message: try: message = Sign1Message.decode(receipt) except (ValueError, AttributeError): + # This is fatal print("failed to decode cose sign1 from file", file=sys.stderr) - return None + sys.exit(1) return message From a938590efda66dea85a80c9542e9a6e01b9f3329 Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sun, 21 Jul 2024 15:31:51 -0700 Subject: [PATCH 10/12] Tidy up variable names --- scitt/verify_receipt_signature.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index b4798e8..bf81f43 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -25,17 +25,17 @@ def read_cbor_file(cbor_file: str) -> Sign1Message: NOTE: the receipt is expected to be in cbor encoding. """ with open(cbor_file, "rb") as file: - receipt = file.read() + contents = file.read() # decode the cbor encoded cose sign1 message try: - message = Sign1Message.decode(receipt) + cose_object = Sign1Message.decode(contents) except (ValueError, AttributeError): # This is fatal print("failed to decode cose sign1 from file", file=sys.stderr) sys.exit(1) - return message + return cose_object def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: @@ -106,6 +106,8 @@ def verify_receipt(receipt: Sign1Message) -> bool: verifies the counter signed receipt signature """ + print(receipt) + # get the verification key from didweb kid: bytes = receipt.phdr[KID] didurl = receipt.phdr[HEADER_LABEL_DID] From 72df1d94d8515af032e1df6acc8d34d24a432581 Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sun, 21 Jul 2024 15:37:44 -0700 Subject: [PATCH 11/12] Tidy up stray debug --- .gitignore | 1 + scitt/verify_receipt_signature.py | 6 +----- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 1533655..4f1c24f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,6 @@ scitt-signing-key.pem scitt-receipt.txt scitt/artifacts/_manifest/* my-signing-key.pem +receipt.cbor signed-statement.cbor transparent-statement.cbor diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index bf81f43..2a09fde 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -106,8 +106,6 @@ def verify_receipt(receipt: Sign1Message) -> bool: verifies the counter signed receipt signature """ - print(receipt) - # get the verification key from didweb kid: bytes = receipt.phdr[KID] didurl = receipt.phdr[HEADER_LABEL_DID] @@ -118,7 +116,7 @@ def verify_receipt(receipt: Sign1Message) -> bool: receipt.key = cose_key # verify the counter signed receipt signature - verified = receipt.verify_signature() + verified = receipt.verify_signature() # type: ignore return verified @@ -137,14 +135,12 @@ def verify_transparent_statement(transparent_statement: Sign1Message) -> bool: # Re-constitute it as a COSE object try: - print(receipt_bytes) receipt = Sign1Message.decode(receipt_bytes) except (ValueError, AttributeError): print("failed to extract receipt from Transparent Statement", file=sys.stderr) return False # Verify it - print(receipt) return verify_receipt(receipt) From 85ac61bb5da132e130756d01bb1187b6b86462e1 Mon Sep 17 00:00:00 2001 From: Jon Geater Date: Sun, 21 Jul 2024 15:46:46 -0700 Subject: [PATCH 12/12] Clean-up --- scitt/verify_receipt_signature.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scitt/verify_receipt_signature.py b/scitt/verify_receipt_signature.py index 2a09fde..7b6e48d 100644 --- a/scitt/verify_receipt_signature.py +++ b/scitt/verify_receipt_signature.py @@ -116,7 +116,7 @@ def verify_receipt(receipt: Sign1Message) -> bool: receipt.key = cose_key # verify the counter signed receipt signature - verified = receipt.verify_signature() # type: ignore + verified = receipt.verify_signature() # type: ignore return verified