Skip to content

Commit

Permalink
Dev/robin/10150 cwt claims label change (#35)
Browse files Browse the repository at this point in the history
* feat: 10150 support for draft-ietf-cose-cwt-claims-in-headers-10#section-2

The cwt_claims label in the scitt headers is now 15 rather than 13

statements are always created with the new label. The old label is a
fall back case when retrieving the claims from the headers using
`cnf_key_from_phdr`

* feat: temporary support for legacy header to ease integration

---------

Co-authored-by: Robin Bryce <[email protected]>
  • Loading branch information
robinbryce and Robin Bryce authored Nov 15, 2024
1 parent c115e91 commit ddc439a
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 24 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
.env.*
.envrc
.vscode/launch.json
build/
dist/
payload.json
scitt/artifacts/_manifest/*
datatrails_scitt_samples/__pycache__/*
datatrails_scitt_samples/scripts/__pycache__/*
datatrails_scitt_samples/datatrails/__pycache__/*
datatrails_scitt_samples/mmriver/__pycache__/*
venv/*
payload.txt
signed-statement.cbor
statement-receipt.cbor
transparent-statement.cbor
verified_payload.txt
9 changes: 7 additions & 2 deletions datatrails_scitt_samples/cbor_header_labels.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""Definitions of all COSE, SCITT, CBOR labels used by these exmaples"""

# CWT header label comes from version 4 of the scitt architecture document
# CWT header label previously came from version 4 of the scitt architecture document
# https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity
HEADER_LABEL_CWT = 13
HEADER_LABEL_CWT_SCITT_DRAFT_04 = 13

# CWT header label is defined by
# https://datatracker.ietf.org/doc/html/draft-ietf-cose-cwt-claims-in-headers-10#section-2
# And referenced by https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-10.html#name-signed-statements
HEADER_LABEL_CWT = 15

# subject header label comes from version 2 of the scitt architecture document
# https://www.ietf.org/archive/id/draft-birkholz-scitt-architecture-02.html#name-envelope-and-claim-format
Expand Down
10 changes: 8 additions & 2 deletions datatrails_scitt_samples/cose_cnf_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from pycose.keys.keytype import KtyEC2
from pycose.keys.keyparam import KpKty, KpKeyOps, EC2KpCurve

from datatrails_scitt_samples.cbor_header_labels import HEADER_LABEL_CWT
from datatrails_scitt_samples.cbor_header_labels import (
HEADER_LABEL_CWT,
HEADER_LABEL_CWT_SCITT_DRAFT_04,
)
from datatrails_scitt_samples.cbor_header_labels import HEADER_LABEL_CWT_CNF
from datatrails_scitt_samples.cbor_header_labels import HEADER_LABEL_CNF_COSE_KEY

Expand All @@ -20,7 +23,10 @@ def cnf_key_from_phdr(phdr: dict) -> CoseKey:
"""
cwt_claims = phdr.get(HEADER_LABEL_CWT)
if cwt_claims is None:
raise ValueError("Missing cwt claims in protected header")
# fall back to scitt draft 04
cwt_claims = phdr.get(HEADER_LABEL_CWT_SCITT_DRAFT_04)
if cwt_claims is None:
raise ValueError("Missing cwt claims in protected header")

# Note: issuer is the key vault key identity, subject is the tenant's merkle log tile path
cnf_claim = cwt_claims.get(HEADER_LABEL_CWT_CNF)
Expand Down
12 changes: 12 additions & 0 deletions datatrails_scitt_samples/scripts/create_hashed_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys

from datatrails_scitt_samples.statement_creation import create_hashed_signed_statement
from datatrails_scitt_samples.statement_creation import OPTION_USE_DRAFT_04_LABELS
from datatrails_scitt_samples.scripts.fileaccess import read_file, open_signing_key
from hashlib import sha256

Expand Down Expand Up @@ -83,13 +84,23 @@ def main(args=None):
help="subject to correlate statements made about an artifact.",
)

parser.add_argument(
"--use-draft-04-labels",
help="force use of legacy labels (eg cwt_claims label 13 rather than 15)",
action="store_true",
)

args = parser.parse_args(args or sys.argv[1:])

if args.metadata_file is not None:
meta_map_dict = json.loads(read_file(args.metadata_file))
else:
meta_map_dict = {}

options = {}
if args.use_draft_04_labels:
options[OPTION_USE_DRAFT_04_LABELS] = True

signing_key = open_signing_key(args.signing_key_file)
payload_contents = read_file(args.payload_file)
payload_hash = sha256(payload_contents.encode("utf-8")).digest()
Expand All @@ -103,6 +114,7 @@ def main(args=None):
payload_location=args.payload_location,
signing_key=signing_key,
subject=args.subject,
**options,
)

with open(args.output_file, "wb") as output_file:
Expand Down
12 changes: 12 additions & 0 deletions datatrails_scitt_samples/scripts/create_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from datatrails_scitt_samples.scripts.fileaccess import read_file, open_signing_key
from datatrails_scitt_samples.statement_creation import create_signed_statement
from datatrails_scitt_samples.statement_creation import OPTION_USE_DRAFT_04_LABELS


def main(args=None):
Expand Down Expand Up @@ -87,6 +88,12 @@ def main(args=None):
default="signed-statement.cbor",
)

parser.add_argument(
"--use-draft-04-labels",
help="force use of legacy labels (eg cwt_claims label 13 rather than 15)",
action="store_true",
)

args = parser.parse_args(args or sys.argv[1:])

if args.metadata_file is not None:
Expand All @@ -98,6 +105,10 @@ def main(args=None):
# Payload must be encoded to bytes
payload = read_file(args.payload_file).encode("utf-8")

options = {}
if args.use_draft_04_labels:
options[OPTION_USE_DRAFT_04_LABELS] = True

signed_statement = create_signed_statement(
content_type=args.content_type,
issuer=args.issuer,
Expand All @@ -107,6 +118,7 @@ def main(args=None):
payload_location=args.payload_location,
subject=args.subject,
signing_key=signing_key,
**options,
)

with open(args.output_file, "wb") as output_file:
Expand Down
23 changes: 20 additions & 3 deletions datatrails_scitt_samples/statement_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
COSE_TYPE,
HEADER_LABEL_FEED,
HEADER_LABEL_CWT,
HEADER_LABEL_CWT_SCITT_DRAFT_04,
HEADER_LABEL_CWT_ISSUER,
HEADER_LABEL_CWT_SUBJECT,
HEADER_LABEL_CWT_CNF,
Expand All @@ -32,6 +33,8 @@
HEADER_LABEL_COSE_ALG_SHA512,
)

OPTION_USE_DRAFT_04_LABELS = "draft_04_labels"


# pylint: disable=too-many-positional-arguments
def create_hashed_signed_statement(
Expand All @@ -44,6 +47,7 @@ def create_hashed_signed_statement(
payload_location: str,
signing_key: SigningKey,
subject: str,
**kwargs,
) -> bytes:
"""
creates a hashed signed statement, given the signing_key, payload, subject and issuer
Expand All @@ -63,6 +67,7 @@ def create_hashed_signed_statement(
payload_location,
verifying_key,
subject,
**kwargs,
)

# create the cose_key to locally sign the statement using the signing key
Expand All @@ -85,6 +90,7 @@ def create_hashed_statement(
payload_location: str,
verifying_key: VerifyingKey,
subject: str,
**kwargs,
) -> Sign1Message:
"""
creates a hashed signed statement, given the verification_key, payload, subject and issuer
Expand All @@ -106,7 +112,11 @@ def create_hashed_statement(
# the verification key is attached to the cwt claims
protected_header[Algorithm] = Es256
protected_header[KID] = kid
protected_header[HEADER_LABEL_CWT] = cwt
cwt_label = HEADER_LABEL_CWT
if kwargs.get(OPTION_USE_DRAFT_04_LABELS):
cwt_label = HEADER_LABEL_CWT_SCITT_DRAFT_04

protected_header[cwt_label] = cwt

# create the statement as a sign1 message using the protected header and payload
return Sign1Message(phdr=protected_header, payload=payload)
Expand All @@ -122,6 +132,7 @@ def create_signed_statement(
issuer: str,
content_type: str,
payload_location: str,
**kwargs,
) -> bytes:
"""
creates a signed statement, given the signing_key, payload, subject and issuer
Expand All @@ -132,7 +143,7 @@ def create_signed_statement(
raise ValueError("signing key does not have a verifying key")

statement = create_statement(
kid, meta_map, verifying_key, payload, subject, issuer, content_type
kid, meta_map, verifying_key, payload, subject, issuer, content_type, **kwargs
)

# create the cose_key for locally signing the statement
Expand All @@ -153,6 +164,7 @@ def create_statement(
subject: str,
issuer: str,
content_type: str,
**kwargs,
) -> Sign1Message:
"""
creates a statement, given the verification_key, payload, subject and issuer.
Expand All @@ -169,7 +181,12 @@ def create_statement(
protected_header = inline_payload_protected_header(subject, content_type, meta_map)
protected_header[Algorithm] = Es256
protected_header[KID] = kid
protected_header[HEADER_LABEL_CWT] = cwt

cwt_label = HEADER_LABEL_CWT
if kwargs.get(OPTION_USE_DRAFT_04_LABELS):
cwt_label = HEADER_LABEL_CWT_SCITT_DRAFT_04

protected_header[cwt_label] = cwt

protected_header = {
Algorithm: Es256,
Expand Down
12 changes: 12 additions & 0 deletions unittests/create_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from datatrails_scitt_samples.statement_creation import OPTION_USE_DRAFT_04_LABELS
from datatrails_scitt_samples.cbor_header_labels import (
HEADER_LABEL_CWT,
HEADER_LABEL_CWT_SCITT_DRAFT_04,
)

# Use this until the backend support for cwt label 15 is available
create_options = {OPTION_USE_DRAFT_04_LABELS: True}


def get_cwt_phdr(phdr):
return phdr.get(HEADER_LABEL_CWT) or phdr.get(HEADER_LABEL_CWT_SCITT_DRAFT_04)
34 changes: 20 additions & 14 deletions unittests/test_create_hashed_signed_statement.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Pairwise unit tests for creating a signed statement with a hashed payload
"""

import unittest
import json

Expand All @@ -21,19 +22,14 @@
from datatrails_scitt_samples.statement_creation import (
cose_key_ec2_p256,
create_hashed_signed_statement,
create_hashed_statement
create_hashed_statement,
)

from datatrails_scitt_samples.cose_sign1message import (
extract_to_be_signed
)
from datatrails_scitt_samples.cose_sign1message import extract_to_be_signed

from datatrails_scitt_samples.cose_cnf_key import (
cnf_key_from_phdr
)
from datatrails_scitt_samples.cose_cnf_key import cnf_key_from_phdr

from datatrails_scitt_samples.cbor_header_labels import (
HEADER_LABEL_CWT,
HEADER_LABEL_CWT_CNF,
HEADER_LABEL_CNF_COSE_KEY,
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM,
Expand All @@ -42,6 +38,8 @@

from .constants import KNOWN_STATEMENT

from .create_options import create_options, get_cwt_phdr


class TestCreateHashedSignedStatement(unittest.TestCase):
"""
Expand Down Expand Up @@ -79,6 +77,7 @@ def test_sign_and_verify_statement(self):
payload_hash_alg=payload_hash_alg,
payload_location=payload_location,
signing_key=signing_key,
**create_options,
)

# decode the cbor encoded cose sign1 message
Expand All @@ -94,7 +93,7 @@ def test_sign_and_verify_statement(self):
self.assertEqual(payload_location, message.phdr[HEADER_LABEL_LOCATION])

# get the verification key from cwt cnf
cwt = message.phdr[HEADER_LABEL_CWT]
cwt = get_cwt_phdr(message.phdr)
cnf = cwt[HEADER_LABEL_CWT_CNF]
verification_key = cnf[HEADER_LABEL_CNF_COSE_KEY]

Expand All @@ -115,7 +114,6 @@ def test_sign_and_verify_statement(self):

self.assertTrue(verified)


def test_create_hashed_statement_remote_sign(self):
"""Test using the samples api to accomplish remote issuer signing"""

Expand Down Expand Up @@ -151,6 +149,7 @@ def test_create_hashed_statement_remote_sign(self):
payload_hash_alg=payload_hash_alg,
payload_location=payload_location,
verifying_key=verifying_key,
**create_options,
)

# This is essentially compute_signature() from pycose's SignCommon (base of Sign1Message)
Expand All @@ -165,16 +164,23 @@ def test_create_hashed_statement_remote_sign(self):
# Receive signature bytes in response and set them on the statement
signature = alg.sign(key=cose_signing_key, data=to_be_signed)


# Now, locally, complete serialization of the statement with the signature attached

# This would be nice, but pycose doesn't appear to support it
# statement.signature = signature
# signed_statement = statement.encode(sign=False)

# Instead, we'll just encode directly following the implementation of encod
struct = [statement.phdr_encoded, statement.uhdr_encoded, statement.payload, signature]
signed_statement = cbor2.dumps(cbor2.CBORTag(statement.cbor_tag, struct), default=statement._custom_cbor_encoder)
struct = [
statement.phdr_encoded,
statement.uhdr_encoded,
statement.payload,
signature,
]
signed_statement = cbor2.dumps(
cbor2.CBORTag(statement.cbor_tag, struct),
default=statement._custom_cbor_encoder,
)

# decode the cbor encoded cose sign1 message
message = Sign1Message.decode(signed_statement)
Expand All @@ -189,4 +195,4 @@ def test_create_hashed_statement_remote_sign(self):
# NOTICE: This just verifies the issuer signature, not the content of
# the statement, the counter signature by the transparency service, or
# its inclusion on a log.
self.assertTrue(verified)
self.assertTrue(verified)
6 changes: 4 additions & 2 deletions unittests/test_create_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

from datatrails_scitt_samples.statement_creation import create_signed_statement
from datatrails_scitt_samples.cbor_header_labels import (
HEADER_LABEL_CWT,
HEADER_LABEL_CWT_CNF,
HEADER_LABEL_CNF_COSE_KEY,
)
from .constants import KNOWN_STATEMENT

from .create_options import create_options, get_cwt_phdr


class TestCreateSignedStatement(unittest.TestCase):
"""
Expand Down Expand Up @@ -55,6 +56,7 @@ def test_sign_and_verifiy_statement(self):
payload=payload,
payload_location=payload_location,
signing_key=signing_key,
**create_options,
)

# verify the signed statement
Expand All @@ -63,7 +65,7 @@ def test_sign_and_verifiy_statement(self):
message = Sign1Message.decode(signed_statement)

# get the verification key from cwt cnf
cwt = message.phdr[HEADER_LABEL_CWT]
cwt = get_cwt_phdr(message.phdr)
cnf = cwt[HEADER_LABEL_CWT_CNF]
verification_key = cnf[HEADER_LABEL_CNF_COSE_KEY]

Expand Down
2 changes: 1 addition & 1 deletion unittests/test_register_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.test_dir)


@unittest.skipUnless(
os.getenv("DATATRAILS_CLIENT_SECRET") != "",
"test requires authentication via env DATATRAILS_xxx",
Expand All @@ -49,6 +48,7 @@ def test_create_and_register_statement(self):
# create a signed statement
create_hashed_signed_statement(
[
"--use-draft-04-labels", # TEMPORY: Until backend support catches up
"--signing-key-file",
"my-signing-key.pem",
"--payload-file",
Expand Down

0 comments on commit ddc439a

Please sign in to comment.