From 06036fd76757646eed011299190ad83ff1b73635 Mon Sep 17 00:00:00 2001 From: Robin Bryce Date: Wed, 11 Dec 2024 10:14:12 +0000 Subject: [PATCH] update to work with improved outputformat from samples register --- .../scripts/register_signed_statement.py | 25 ++- .../scripts/verify_receipt.py | 156 +++++++++++++++ .../scripts/verify_receipt.py.disabled | 185 ------------------ .../statement_registration.py | 4 +- 4 files changed, 174 insertions(+), 196 deletions(-) create mode 100644 datatrails_scitt_samples/scripts/verify_receipt.py delete mode 100644 datatrails_scitt_samples/scripts/verify_receipt.py.disabled diff --git a/datatrails_scitt_samples/scripts/register_signed_statement.py b/datatrails_scitt_samples/scripts/register_signed_statement.py index d9063f3..8e0a59f 100755 --- a/datatrails_scitt_samples/scripts/register_signed_statement.py +++ b/datatrails_scitt_samples/scripts/register_signed_statement.py @@ -3,6 +3,7 @@ a Transparent Statement""" import sys +import json import argparse from pycose.messages import Sign1Message @@ -103,17 +104,20 @@ def main(args=None): op_id = submit_statement_from_file(ctx, args.signed_statement_file) ctx.info("Successfully submitted with Operation ID %s", op_id) + # Always wait for registration to complete + ctx.info("Waiting for registration to complete") + # Wait for the registration to complete + try: + entry_id = wait_for_entry_id(ctx, op_id) + except TimeoutError as e: + ctx.error(e) + sys.exit(1) + ctx.info("Fully Registered with Entry ID %s", entry_id) + + result = {"entryid": entry_id} + # If the client wants the Transparent Statement or receipt, wait for registration to complete if args.verify or args.output_file != "": - ctx.info("Waiting for registration to complete") - # Wait for the registration to complete - try: - entry_id = wait_for_entry_id(ctx, op_id) - except TimeoutError as e: - ctx.error(e) - sys.exit(1) - ctx.info("Fully Registered with Entry ID %s", entry_id) - leaf = get_leaf_hash(ctx, entry_id) # Notice: the leaf hash corresponds to the leaf hash visible in the UI ctx.info("Leaf Hash: %s", leaf.hex()) @@ -128,6 +132,7 @@ def main(args=None): if not verify_receipt_mmriver(receipt, leaf): ctx.info("Receipt verification failed") sys.exit(1) + result["leaf"] = leaf.hex() if args.output_file == "": return @@ -141,6 +146,8 @@ def main(args=None): attach_receipt(receipt, args.signed_statement_file, args.output_file) ctx.info(f"File saved successfully {args.output_file}") + print(json.dumps(result)) + if __name__ == "__main__": main() diff --git a/datatrails_scitt_samples/scripts/verify_receipt.py b/datatrails_scitt_samples/scripts/verify_receipt.py new file mode 100644 index 0000000..aba9dde --- /dev/null +++ b/datatrails_scitt_samples/scripts/verify_receipt.py @@ -0,0 +1,156 @@ +""" Module for verifying the counter signed receipt signature """ + +import re +import argparse +import sys +import json + +import requests + +from jwcrypto import jwk + +from pycose.messages import Sign1Message +from pycose.keys.curves import P384 +from pycose.keys.keyparam import KpKty, EC2KpX, EC2KpY, KpKeyOps, EC2KpCurve +from pycose.keys.keytype import KtyEC2 +from pycose.keys.keyops import VerifyOp +from pycose.keys import CoseKey +from pycose.headers import KID + +from datatrails_scitt_samples.cose_receipt_verification import ( + verify_receipt_mmriver +) +from datatrails_scitt_samples.scripts.fileaccess import open_event_json +from datatrails_scitt_samples.datatrails.eventpreimage import get_event +from datatrails_scitt_samples.datatrails.v3eventhash import v3leaf_hash, v3event_hash +from datatrails_scitt_samples.datatrails.entryid import entryid_to_identity + +from datatrails_scitt_samples.datatrails.servicecontext import ServiceContext + +HEADER_LABEL_DID = 391 + + +def read_cbor_file(cbor_file: str) -> bytes: + """ + opens the receipt from the receipt file. + """ + with open(cbor_file, "rb") as file: + contents = file.read() + + # decode the cbor encoded cose sign1 message + try: + cose_object = Sign1Message.decode(contents) + except (ValueError, AttributeError): + # This is fatal + print("failed to decode cose sign1 from file", file=sys.stderr) + sys.exit(1) + + return cose_object + + +def verify_transparent_statement(transparent_statement: Sign1Message, leaf: bytes) -> bool: + """ + verifies the counter signed receipt signature in a TS + """ + + # Pull the receipt out of the structure + try: + receipt_bytes = transparent_statement.uhdr["receipts"][0] + except (ValueError, AttributeError, KeyError): + print("failed to extract receipt from Transparent Statement", file=sys.stderr) + return False + + return verify_receipt_mmriver(receipt_bytes, leaf) + + +def main(): + """Verifies a counter signed receipt signature""" + + parser = argparse.ArgumentParser( + description="Verify countersigned signature from a Receipt or Transparent Statement." + ) + parser.add_argument( + "--datatrails-url", + type=str, + help="The url of the DataTrails transparency service.", + default=None, + ) + options = parser.add_argument_group("Node (Leaf) Hash") + options.add_argument( + "--leaf", + type=str, + help="hex encoded leaf hash to verify against") + + options.add_argument( + "--entryid", + type=str, + help="the SCRAPI entry id of the statement") + + parser.add_argument( + "--event-json-file", + type=str, + help="filepath to the stored event, in json format.", + default=None, + ) + + options = parser.add_argument_group("Input File Type") + options.add_argument( + "--receipt-file", + type=str, + help="filepath to a stored Receipt, in CBOR format.", + ) + options.add_argument( + "--transparent-statement-file", + type=str, + help="filepath to a stored Transparent Statement, in CBOR format.", + default="transparent-statement.cbor", + ) + + args = parser.parse_args() + + # Note: the context is only used if --entryid is + # used to obtain the leaf hash directly from datatrails + cfg_overrides = {} + if args.datatrails_url: + cfg_overrides["datatrails_url"] = args.datatrails_url + ctx = ServiceContext.from_env("verify-receipt", **cfg_overrides) + + if not (args.leaf or args.event_json_file or args.entryid): + print("either --leaf or --event-json-file is required", file=sys.stderr) + sys.exit(1) + + leaf = None + if args.leaf: + leaf = bytes.fromhex(args.leaf) + elif args.event_json_file: + event = json.loads(open_event_json(args.event_json_file)) + leaf = v3leaf_hash(event) + print(leaf.hex()) + elif args.entryid: + identity = entryid_to_identity(args.entryid) + event = get_event(ctx, identity, True) + leaf = v3leaf_hash(event) + print(leaf.hex()) + + if leaf is None: + print("failed to obtain leaf hash", file=sys.stderr) + sys.exit(1) + + if args.receipt_file: + with open(args.receipt_file, "rb") as file: + contents = file.read() + verified = verify_transparent_statement(contents, leaf) + else: + # Note this logic works because only the transparent statement arg + # has a default. Don't change that without changing this! + transparent_statement = read_cbor_file(args.transparent_statement_file) + verified = verify_transparent_statement(transparent_statement, leaf) + + if verified: + print("signature verification succeeded") + else: + print("signature verification failed") + + +if __name__ == "__main__": + main() diff --git a/datatrails_scitt_samples/scripts/verify_receipt.py.disabled b/datatrails_scitt_samples/scripts/verify_receipt.py.disabled deleted file mode 100644 index 7b6e48d..0000000 --- a/datatrails_scitt_samples/scripts/verify_receipt.py.disabled +++ /dev/null @@ -1,185 +0,0 @@ -""" Module for verifying the counter signed receipt signature """ - -import re -import argparse -import sys - -import requests - -from jwcrypto import jwk - -from pycose.messages import Sign1Message -from pycose.keys.curves import P384 -from pycose.keys.keyparam import KpKty, EC2KpX, EC2KpY, KpKeyOps, EC2KpCurve -from pycose.keys.keytype import KtyEC2 -from pycose.keys.keyops import VerifyOp -from pycose.keys import CoseKey -from pycose.headers import KID - -HEADER_LABEL_DID = 391 - - -def read_cbor_file(cbor_file: str) -> Sign1Message: - """ - opens the receipt from the receipt file. - NOTE: the receipt is expected to be in cbor encoding. - """ - with open(cbor_file, "rb") as file: - contents = file.read() - - # decode the cbor encoded cose sign1 message - try: - cose_object = Sign1Message.decode(contents) - except (ValueError, AttributeError): - # This is fatal - print("failed to decode cose sign1 from file", file=sys.stderr) - sys.exit(1) - - return cose_object - - -def get_didweb_pubkey(didurl: str, kid: bytes) -> dict: - """ - gets the given did web public key, given the key ID (kid) and didurl. - see https://w3c-ccg.github.io/did-method-web/ - NOTE: expects the key to be ecdsa P-384. - """ - - # check the didurl is a valid did web url - # pylint: disable=line-too-long - pattern = r"did:web:(?P[a-zA-Z0-9/.\-_]+)(?:%3A(?P[0-9]+))?(:*)(?P[a-zA-Z0-9/.:\-_]*)" - match = re.match(pattern, didurl) - - if not match: - raise ValueError("DID is not a valid did:web") - - # convert the didweb url into a url: - # - # e.g. did:web:example.com:foo:bar - # becomes: https://example.com/foo/bar/did.json - groups = match.groupdict() - host = groups["host"] - port = groups.get("port") # might be None - path = groups["path"] - - origin = f"{host}:{port}" if port else host - - protocol = "https" - - decoded_partial_path = path.replace(":", "/") - - endpoint = ( - f"{protocol}://{origin}/{decoded_partial_path}/did.json" - if path - else f"{protocol}://{origin}/.well-known/did.json" - ) - - # do a https GET on the url to get the did document - resp = requests.get(endpoint, timeout=60) - assert resp.status_code == 200 - - did_document = resp.json() - - # now search the verification methods for the correct public key - for verification_method in did_document["verificationMethod"]: - if verification_method["publicKeyJwk"]["kid"] != kid.decode("utf-8"): - continue - - x_part = verification_method["publicKeyJwk"]["x"] - y_part = verification_method["publicKeyJwk"]["y"] - - cose_key = { - KpKty: KtyEC2, - EC2KpCurve: P384, - KpKeyOps: [VerifyOp], - EC2KpX: jwk.base64url_decode(x_part), - EC2KpY: jwk.base64url_decode(y_part), - } - - return cose_key - - raise ValueError(f"no key with kid: {kid} in verification methods of did document") - - -def verify_receipt(receipt: Sign1Message) -> bool: - """ - verifies the counter signed receipt signature - """ - - # get the verification key from didweb - kid: bytes = receipt.phdr[KID] - didurl = receipt.phdr[HEADER_LABEL_DID] - - cose_key_dict = get_didweb_pubkey(didurl, kid) - cose_key = CoseKey.from_dict(cose_key_dict) - - receipt.key = cose_key - - # verify the counter signed receipt signature - verified = receipt.verify_signature() # type: ignore - - return verified - - -def verify_transparent_statement(transparent_statement: Sign1Message) -> bool: - """ - verifies the counter signed receipt signature in a TS - """ - - # Pull the receipt out of the structure - try: - receipt_bytes = transparent_statement.uhdr["receipts"][0] - except (ValueError, AttributeError, KeyError): - print("failed to extract receipt from Transparent Statement", file=sys.stderr) - return False - - # Re-constitute it as a COSE object - try: - receipt = Sign1Message.decode(receipt_bytes) - except (ValueError, AttributeError): - print("failed to extract receipt from Transparent Statement", file=sys.stderr) - return False - - # Verify it - return verify_receipt(receipt) - - -def main(): - """Verifies a counter signed receipt signature""" - - parser = argparse.ArgumentParser( - description="Verify countersigned signature from a Receipt or Transparent Statement." - ) - - options = parser.add_argument_group("Input File Type") - options.add_argument( - "--receipt-file", - type=str, - help="filepath to a stored Receipt, in CBOR format.", - ) - options.add_argument( - "--transparent-statement-file", - type=str, - help="filepath to a stored Transparent Statement, in CBOR format.", - default="transparent-statement.cbor", - ) - - args = parser.parse_args() - - if args.receipt_file: - receipt = read_cbor_file(args.receipt_file) - verified = verify_receipt(receipt) - else: - # Note this logic works because only the transparent statement arg - # has a default. Don't change that without changing this! - transparent_statement = read_cbor_file(args.transparent_statement_file) - verified = verify_transparent_statement(transparent_statement) - - if verified: - print("signature verification succeeded") - else: - print("signature verification failed") - - -if __name__ == "__main__": - main() diff --git a/datatrails_scitt_samples/statement_registration.py b/datatrails_scitt_samples/statement_registration.py index a698e19..85a08e9 100644 --- a/datatrails_scitt_samples/statement_registration.py +++ b/datatrails_scitt_samples/statement_registration.py @@ -60,7 +60,7 @@ def submit_statement_from_file( # Read the binary data from the file # Read the binary data from the file with open(statement_file_path, "rb") as data_file: - ctx.info("statement_file_path opened: %s", statement_file_path) + ctx.debug("statement_file_path opened: %s", statement_file_path) return submit_statement(ctx, data_file.read()) @@ -90,7 +90,7 @@ def wait_for_entry_id( poll_attempts: int = int(ctx.cfg.poll_timeout / ctx.cfg.poll_interval) - ctx.info("starting to poll for operation status 'succeeded'") + ctx.debug("starting to poll for operation status 'succeeded'") for _ in range(poll_attempts): try: