diff --git a/.gitignore b/.gitignore index 30c8a53..01977d6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ .env.* .envrc .vscode/launch.json +build/ +dist/ payload.json scitt/artifacts/_manifest/* datatrails_scitt_samples/__pycache__/* @@ -12,3 +14,8 @@ datatrails_scitt_samples/scripts/__pycache__/* datatrails_scitt_samples/datatrails/__pycache__/* datatrails_scitt_samples/mmriver/__pycache__/* venv/* +payload.txt +signed-statement.cbor +statement-receipt.cbor +transparent-statement.cbor +verified_payload.txt diff --git a/datatrails_scitt_samples/cbor_header_labels.py b/datatrails_scitt_samples/cbor_header_labels.py index 045ef96..d0aed49 100644 --- a/datatrails_scitt_samples/cbor_header_labels.py +++ b/datatrails_scitt_samples/cbor_header_labels.py @@ -1,8 +1,13 @@ """Definitions of all COSE, SCITT, CBOR labels used by these exmaples""" -# CWT header label comes from version 4 of the scitt architecture document +# CWT header label previously came from version 4 of the scitt architecture document # https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity -HEADER_LABEL_CWT = 13 +HEADER_LABEL_CWT_SCITT_DRAFT_04 = 13 + +# CWT header label is defined by +# https://datatracker.ietf.org/doc/html/draft-ietf-cose-cwt-claims-in-headers-10#section-2 +# And referenced by https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-10.html#name-signed-statements +HEADER_LABEL_CWT = 15 # subject header label comes from version 2 of the scitt architecture document # https://www.ietf.org/archive/id/draft-birkholz-scitt-architecture-02.html#name-envelope-and-claim-format diff --git a/datatrails_scitt_samples/cose_cnf_key.py b/datatrails_scitt_samples/cose_cnf_key.py index 28611b2..4b8ff9e 100644 --- a/datatrails_scitt_samples/cose_cnf_key.py +++ b/datatrails_scitt_samples/cose_cnf_key.py @@ -9,7 +9,10 @@ from pycose.keys.keytype import KtyEC2 from pycose.keys.keyparam import KpKty, KpKeyOps, EC2KpCurve -from datatrails_scitt_samples.cbor_header_labels import HEADER_LABEL_CWT +from datatrails_scitt_samples.cbor_header_labels import ( + HEADER_LABEL_CWT, + HEADER_LABEL_CWT_SCITT_DRAFT_04, +) from datatrails_scitt_samples.cbor_header_labels import HEADER_LABEL_CWT_CNF from datatrails_scitt_samples.cbor_header_labels import HEADER_LABEL_CNF_COSE_KEY @@ -20,7 +23,10 @@ def cnf_key_from_phdr(phdr: dict) -> CoseKey: """ cwt_claims = phdr.get(HEADER_LABEL_CWT) if cwt_claims is None: - raise ValueError("Missing cwt claims in protected header") + # fall back to scitt draft 04 + cwt_claims = phdr.get(HEADER_LABEL_CWT_SCITT_DRAFT_04) + if cwt_claims is None: + raise ValueError("Missing cwt claims in protected header") # Note: issuer is the key vault key identity, subject is the tenant's merkle log tile path cnf_claim = cwt_claims.get(HEADER_LABEL_CWT_CNF) diff --git a/datatrails_scitt_samples/scripts/create_hashed_signed_statement.py b/datatrails_scitt_samples/scripts/create_hashed_signed_statement.py index 0405a67..1094156 100755 --- a/datatrails_scitt_samples/scripts/create_hashed_signed_statement.py +++ b/datatrails_scitt_samples/scripts/create_hashed_signed_statement.py @@ -5,6 +5,7 @@ import sys from datatrails_scitt_samples.statement_creation import create_hashed_signed_statement +from datatrails_scitt_samples.statement_creation import OPTION_USE_DRAFT_04_LABELS from datatrails_scitt_samples.scripts.fileaccess import read_file, open_signing_key from hashlib import sha256 @@ -83,6 +84,12 @@ def main(args=None): help="subject to correlate statements made about an artifact.", ) + parser.add_argument( + "--use-draft-04-labels", + help="force use of legacy labels (eg cwt_claims label 13 rather than 15)", + action="store_true", + ) + args = parser.parse_args(args or sys.argv[1:]) if args.metadata_file is not None: @@ -90,6 +97,10 @@ def main(args=None): else: meta_map_dict = {} + options = {} + if args.use_draft_04_labels: + options[OPTION_USE_DRAFT_04_LABELS] = True + signing_key = open_signing_key(args.signing_key_file) payload_contents = read_file(args.payload_file) payload_hash = sha256(payload_contents.encode("utf-8")).digest() @@ -103,6 +114,7 @@ def main(args=None): payload_location=args.payload_location, signing_key=signing_key, subject=args.subject, + **options, ) with open(args.output_file, "wb") as output_file: diff --git a/datatrails_scitt_samples/scripts/create_signed_statement.py b/datatrails_scitt_samples/scripts/create_signed_statement.py index 8aabaf4..e56d257 100755 --- a/datatrails_scitt_samples/scripts/create_signed_statement.py +++ b/datatrails_scitt_samples/scripts/create_signed_statement.py @@ -6,6 +6,7 @@ from datatrails_scitt_samples.scripts.fileaccess import read_file, open_signing_key from datatrails_scitt_samples.statement_creation import create_signed_statement +from datatrails_scitt_samples.statement_creation import OPTION_USE_DRAFT_04_LABELS def main(args=None): @@ -87,6 +88,12 @@ def main(args=None): default="signed-statement.cbor", ) + parser.add_argument( + "--use-draft-04-labels", + help="force use of legacy labels (eg cwt_claims label 13 rather than 15)", + action="store_true", + ) + args = parser.parse_args(args or sys.argv[1:]) if args.metadata_file is not None: @@ -98,6 +105,10 @@ def main(args=None): # Payload must be encoded to bytes payload = read_file(args.payload_file).encode("utf-8") + options = {} + if args.use_draft_04_labels: + options[OPTION_USE_DRAFT_04_LABELS] = True + signed_statement = create_signed_statement( content_type=args.content_type, issuer=args.issuer, @@ -107,6 +118,7 @@ def main(args=None): payload_location=args.payload_location, subject=args.subject, signing_key=signing_key, + **options, ) with open(args.output_file, "wb") as output_file: diff --git a/datatrails_scitt_samples/statement_creation.py b/datatrails_scitt_samples/statement_creation.py index 7a7ed0e..8c9402c 100644 --- a/datatrails_scitt_samples/statement_creation.py +++ b/datatrails_scitt_samples/statement_creation.py @@ -19,6 +19,7 @@ COSE_TYPE, HEADER_LABEL_FEED, HEADER_LABEL_CWT, + HEADER_LABEL_CWT_SCITT_DRAFT_04, HEADER_LABEL_CWT_ISSUER, HEADER_LABEL_CWT_SUBJECT, HEADER_LABEL_CWT_CNF, @@ -32,6 +33,8 @@ HEADER_LABEL_COSE_ALG_SHA512, ) +OPTION_USE_DRAFT_04_LABELS = "draft_04_labels" + # pylint: disable=too-many-positional-arguments def create_hashed_signed_statement( @@ -44,6 +47,7 @@ def create_hashed_signed_statement( payload_location: str, signing_key: SigningKey, subject: str, + **kwargs, ) -> bytes: """ creates a hashed signed statement, given the signing_key, payload, subject and issuer @@ -63,6 +67,7 @@ def create_hashed_signed_statement( payload_location, verifying_key, subject, + **kwargs, ) # create the cose_key to locally sign the statement using the signing key @@ -85,6 +90,7 @@ def create_hashed_statement( payload_location: str, verifying_key: VerifyingKey, subject: str, + **kwargs, ) -> Sign1Message: """ creates a hashed signed statement, given the verification_key, payload, subject and issuer @@ -106,7 +112,11 @@ def create_hashed_statement( # the verification key is attached to the cwt claims protected_header[Algorithm] = Es256 protected_header[KID] = kid - protected_header[HEADER_LABEL_CWT] = cwt + cwt_label = HEADER_LABEL_CWT + if kwargs.get(OPTION_USE_DRAFT_04_LABELS): + cwt_label = HEADER_LABEL_CWT_SCITT_DRAFT_04 + + protected_header[cwt_label] = cwt # create the statement as a sign1 message using the protected header and payload return Sign1Message(phdr=protected_header, payload=payload) @@ -122,6 +132,7 @@ def create_signed_statement( issuer: str, content_type: str, payload_location: str, + **kwargs, ) -> bytes: """ creates a signed statement, given the signing_key, payload, subject and issuer @@ -132,7 +143,7 @@ def create_signed_statement( raise ValueError("signing key does not have a verifying key") statement = create_statement( - kid, meta_map, verifying_key, payload, subject, issuer, content_type + kid, meta_map, verifying_key, payload, subject, issuer, content_type, **kwargs ) # create the cose_key for locally signing the statement @@ -153,6 +164,7 @@ def create_statement( subject: str, issuer: str, content_type: str, + **kwargs, ) -> Sign1Message: """ creates a statement, given the verification_key, payload, subject and issuer. @@ -169,7 +181,12 @@ def create_statement( protected_header = inline_payload_protected_header(subject, content_type, meta_map) protected_header[Algorithm] = Es256 protected_header[KID] = kid - protected_header[HEADER_LABEL_CWT] = cwt + + cwt_label = HEADER_LABEL_CWT + if kwargs.get(OPTION_USE_DRAFT_04_LABELS): + cwt_label = HEADER_LABEL_CWT_SCITT_DRAFT_04 + + protected_header[cwt_label] = cwt protected_header = { Algorithm: Es256, diff --git a/unittests/create_options.py b/unittests/create_options.py new file mode 100644 index 0000000..5c6f508 --- /dev/null +++ b/unittests/create_options.py @@ -0,0 +1,12 @@ +from datatrails_scitt_samples.statement_creation import OPTION_USE_DRAFT_04_LABELS +from datatrails_scitt_samples.cbor_header_labels import ( + HEADER_LABEL_CWT, + HEADER_LABEL_CWT_SCITT_DRAFT_04, +) + +# Use this until the backend support for cwt label 15 is available +create_options = {OPTION_USE_DRAFT_04_LABELS: True} + + +def get_cwt_phdr(phdr): + return phdr.get(HEADER_LABEL_CWT) or phdr.get(HEADER_LABEL_CWT_SCITT_DRAFT_04) diff --git a/unittests/test_create_hashed_signed_statement.py b/unittests/test_create_hashed_signed_statement.py index 4c39f65..46cba54 100644 --- a/unittests/test_create_hashed_signed_statement.py +++ b/unittests/test_create_hashed_signed_statement.py @@ -1,6 +1,7 @@ """ Pairwise unit tests for creating a signed statement with a hashed payload """ + import unittest import json @@ -21,19 +22,14 @@ from datatrails_scitt_samples.statement_creation import ( cose_key_ec2_p256, create_hashed_signed_statement, - create_hashed_statement + create_hashed_statement, ) -from datatrails_scitt_samples.cose_sign1message import ( - extract_to_be_signed -) +from datatrails_scitt_samples.cose_sign1message import extract_to_be_signed -from datatrails_scitt_samples.cose_cnf_key import ( - cnf_key_from_phdr -) +from datatrails_scitt_samples.cose_cnf_key import cnf_key_from_phdr from datatrails_scitt_samples.cbor_header_labels import ( - HEADER_LABEL_CWT, HEADER_LABEL_CWT_CNF, HEADER_LABEL_CNF_COSE_KEY, HEADER_LABEL_PAYLOAD_HASH_ALGORITHM, @@ -42,6 +38,8 @@ from .constants import KNOWN_STATEMENT +from .create_options import create_options, get_cwt_phdr + class TestCreateHashedSignedStatement(unittest.TestCase): """ @@ -79,6 +77,7 @@ def test_sign_and_verify_statement(self): payload_hash_alg=payload_hash_alg, payload_location=payload_location, signing_key=signing_key, + **create_options, ) # decode the cbor encoded cose sign1 message @@ -94,7 +93,7 @@ def test_sign_and_verify_statement(self): self.assertEqual(payload_location, message.phdr[HEADER_LABEL_LOCATION]) # get the verification key from cwt cnf - cwt = message.phdr[HEADER_LABEL_CWT] + cwt = get_cwt_phdr(message.phdr) cnf = cwt[HEADER_LABEL_CWT_CNF] verification_key = cnf[HEADER_LABEL_CNF_COSE_KEY] @@ -115,7 +114,6 @@ def test_sign_and_verify_statement(self): self.assertTrue(verified) - def test_create_hashed_statement_remote_sign(self): """Test using the samples api to accomplish remote issuer signing""" @@ -151,6 +149,7 @@ def test_create_hashed_statement_remote_sign(self): payload_hash_alg=payload_hash_alg, payload_location=payload_location, verifying_key=verifying_key, + **create_options, ) # This is essentially compute_signature() from pycose's SignCommon (base of Sign1Message) @@ -165,7 +164,6 @@ def test_create_hashed_statement_remote_sign(self): # Receive signature bytes in response and set them on the statement signature = alg.sign(key=cose_signing_key, data=to_be_signed) - # Now, locally, complete serialization of the statement with the signature attached # This would be nice, but pycose doesn't appear to support it @@ -173,8 +171,16 @@ def test_create_hashed_statement_remote_sign(self): # signed_statement = statement.encode(sign=False) # Instead, we'll just encode directly following the implementation of encod - struct = [statement.phdr_encoded, statement.uhdr_encoded, statement.payload, signature] - signed_statement = cbor2.dumps(cbor2.CBORTag(statement.cbor_tag, struct), default=statement._custom_cbor_encoder) + struct = [ + statement.phdr_encoded, + statement.uhdr_encoded, + statement.payload, + signature, + ] + signed_statement = cbor2.dumps( + cbor2.CBORTag(statement.cbor_tag, struct), + default=statement._custom_cbor_encoder, + ) # decode the cbor encoded cose sign1 message message = Sign1Message.decode(signed_statement) @@ -189,4 +195,4 @@ def test_create_hashed_statement_remote_sign(self): # NOTICE: This just verifies the issuer signature, not the content of # the statement, the counter signature by the transparency service, or # its inclusion on a log. - self.assertTrue(verified) \ No newline at end of file + self.assertTrue(verified) diff --git a/unittests/test_create_signed_statement.py b/unittests/test_create_signed_statement.py index c2c348f..13f0fe7 100644 --- a/unittests/test_create_signed_statement.py +++ b/unittests/test_create_signed_statement.py @@ -16,12 +16,13 @@ from datatrails_scitt_samples.statement_creation import create_signed_statement from datatrails_scitt_samples.cbor_header_labels import ( - HEADER_LABEL_CWT, HEADER_LABEL_CWT_CNF, HEADER_LABEL_CNF_COSE_KEY, ) from .constants import KNOWN_STATEMENT +from .create_options import create_options, get_cwt_phdr + class TestCreateSignedStatement(unittest.TestCase): """ @@ -55,6 +56,7 @@ def test_sign_and_verifiy_statement(self): payload=payload, payload_location=payload_location, signing_key=signing_key, + **create_options, ) # verify the signed statement @@ -63,7 +65,7 @@ def test_sign_and_verifiy_statement(self): message = Sign1Message.decode(signed_statement) # get the verification key from cwt cnf - cwt = message.phdr[HEADER_LABEL_CWT] + cwt = get_cwt_phdr(message.phdr) cnf = cwt[HEADER_LABEL_CWT_CNF] verification_key = cnf[HEADER_LABEL_CNF_COSE_KEY] diff --git a/unittests/test_register_signed_statement.py b/unittests/test_register_signed_statement.py index 26b6c82..7e5b8da 100644 --- a/unittests/test_register_signed_statement.py +++ b/unittests/test_register_signed_statement.py @@ -35,7 +35,6 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.test_dir) - @unittest.skipUnless( os.getenv("DATATRAILS_CLIENT_SECRET") != "", "test requires authentication via env DATATRAILS_xxx", @@ -49,6 +48,7 @@ def test_create_and_register_statement(self): # create a signed statement create_hashed_signed_statement( [ + "--use-draft-04-labels", # TEMPORY: Until backend support catches up "--signing-key-file", "my-signing-key.pem", "--payload-file",