diff --git a/credentials/generate-revocation-set.py b/credentials/generate-revocation-set.py index 4cdcfdbad1ae26..f345712512ae80 100644 --- a/credentials/generate-revocation-set.py +++ b/credentials/generate-revocation-set.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - # # Copyright (c) 2023-2024 Project CHIP Authors # @@ -16,13 +15,20 @@ # limitations under the License. # -# Generates a basic RevocationSet from TestNet +# Generates a basic RevocationSet from TestNet or MainNet. +# Note: Indirect CRLs are only supported with py cryptography version 44.0.0. +# You may need to patch in a change locally if you are using an older +# version of py cryptography. The required changes can be viewed in this +# PR: https://github.com/pyca/cryptography/pull/11467/files. The file that +# needs to be patched is accessible from your local connectedhomeip +# directory at ./.environment/pigweed-venv/lib/python3.11/site-packages/cryptography/x509/extensions.py # Usage: # python ./credentials/generate-revocation-set.py --help import base64 import json import logging +import re import subprocess import sys from enum import Enum @@ -93,28 +99,10 @@ def parse_vid_pid_from_distinguished_name(distinguished_name): def get_akid(cert: x509.Certificate) -> Optional[bytes]: - try: - return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier - except Exception: - logging.warning("AKID not found in certificate") - return None - + return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier def get_skid(cert: x509.Certificate) -> Optional[bytes]: - try: - return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier - except Exception: - logging.warning("SKID not found in certificate") - return None - - -def get_subject_b64(cert: x509.Certificate) -> str: - return base64.b64encode(cert.subject.public_bytes()).decode('utf-8') - - -def get_issuer_b64(cert: x509.Certificate) -> str: - return base64.b64encode(cert.issuer.public_bytes()).decode('utf-8') - + return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool: ''' @@ -134,7 +122,7 @@ def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool: try: root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm)) except Exception: - logging.warning(f"Signature verification failed for cert subject: {get_subject_b64(cert)}, issuer: {get_issuer_b64(cert)}") + logging.warning(f"Signature verification failed for cert subject: {cert.subject.rfc4514_string()}, issuer: {cert.issuer.rfc4514_string()}") return False return True @@ -194,31 +182,120 @@ def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList try: r = requests.get(url, timeout=timeout) return x509.load_der_x509_crl(r.content) - except Exception: - logging.error('Failed to fetch a valid CRL') + except Exception as e: + logging.error('Failed to fetch a valid CRL', e) -class DCLDClient: +class DCLDClientInterface: + ''' + An interface for interacting with DCLD. ''' - A client for interacting with DCLD using either the REST API or command line interface (CLI). + def send_get_request(self, url: str) -> dict: + ''' + Send a GET request for a json object. + ''' + try: + response = requests.get(url).json() + return response + except Exception as e: + logging.error(f"Failed to fetch {url}: {e}") + return None + + + def get_revocation_points(self) -> list[dict]: + ''' + Get revocation points from DCL + + Returns + ------- + list[dict] + List of revocation points + ''' + raise NotImplementedError + + def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: + ''' + Get revocation points by subject key ID + + Parameters + ---------- + issuer_subject_key_id: str + Subject key ID + + Returns + ------- + list[dict] + List of revocation points + ''' + raise NotImplementedError + + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: + ''' + Get certificate from DCL + ''' + raise NotImplementedError + + def get_only_approved_certificate(self, response: dict, skid_hex: str) -> tuple[bool, Optional[x509.Certificate]]: + ''' + Get only approved certificate from DCL resposne. + ''' + if response is None or not response.get("approvedCertificates", {}).get("certs", []): + raise requests.exception.NotFound(f"No certificate found for {skid_hex}") + if len(response["approvedCertificates"]["certs"]) > 1: + raise ValueError(f"Multiple certificates found for {skid_hex}") + issuer_certificate = x509.load_pem_x509_certificate(bytes(response["approvedCertificates"]["certs"][0]["pemCert"], "utf-8")) + return response["approvedCertificates"]["certs"][0]["isRoot"], issuer_certificate + + def get_paa_cert(self, initial_cert: x509.Certificate) -> Optional[x509.Certificate]: + ''' + Get the PAA certificate for the CRL Signer Certificate. + ''' + issuer_name = initial_cert.issuer + akid = get_akid(initial_cert) + if akid is None: + logging.error('Failed to get PAA certificate') + return + paa_certificate = None + while not paa_certificate: + try: + akid_hex = akid.hex().upper() + is_root, issuer_certificate = self.get_approved_certificate(issuer_name, akid_hex) + if is_root: + paa_certificate = issuer_certificate + break + + except Exception as e: + logging.error('Failed to get PAA certificate', e) + return + logging.debug(f"issuer_name: {issuer_certificate.subject.rfc4514_string()}") + issuer_name = issuer_certificate.issuer + akid = get_akid(issuer_certificate) + logging.debug(f"akid: {akid}") + if paa_certificate is None: + logging.warning("PAA Certificate not found, continue...") + return paa_certificate + + def get_b64_name(self, name: x509.name.Name) -> str: + ''' + Get base64 encoded name + ''' + return base64.b64encode(name.public_bytes()).decode('utf-8') +class DCLDClient(DCLDClientInterface): + ''' + A client for interacting with DCLD using command line interface (CLI). ''' - def __init__(self, use_rest: bool, dcld_exe: str, production: bool, rest_node_url: str): + def __init__(self, dcld_exe: str, production: bool, rest_node_url: str): ''' Initialize the client - use_rest: bool - Use RESTful API with HTTPS against `rest_node_url` dcld_exe: str Path to `dcld` executable production: bool Use MainNet DCL URL with dcld executable - rest_node_url: str - RESTful API URL ''' - self.use_rest = use_rest self.dcld_exe = dcld_exe self.production = production self.rest_node_url = rest_node_url @@ -272,57 +349,70 @@ def get_revocation_points(self) -> list[dict]: List of revocation points ''' - if self.use_rest: - response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points").json() - else: - response = self.get_dcld_cmd_output_json(['query', 'pki', 'all-revocation-points']) - + response = self.get_dcld_cmd_output_json(['query', 'pki', 'all-revocation-points']) return response["PkiRevocationDistributionPoint"] - def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]: + def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: ''' - Get the issuer certificate for + Get revocation points by subject key ID Parameters ---------- - cert: x509.Certificate - Certificate + issuer_subject_key_id: str + Subject key ID Returns ------- - str - Issuer certificate in PEM format + list[dict] + List of revocation points ''' - issuer_name_b64 = get_issuer_b64(cert) - akid = get_akid(cert) - if akid is None: - return - # Convert CRL Signer AKID to colon separated hex - akid_hex = akid.hex().upper() - akid_hex = ':'.join([akid_hex[i:i+2] for i in range(0, len(akid_hex), 2)]) + response = self.get_dcld_cmd_output_json(['query', 'pki', 'revocation-points', + '--issuer-subject-key-id', issuer_subject_key_id]) + logging.debug(f"Response revocation points: {response}") + return response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"] + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: + ''' + Get certificate from DCL + ''' + subject_name_b64 = self.get_b64_name(subject_name) + query_cmd_list = ['query', 'pki', 'x509-cert', '-u', subject_name_b64, '-k', skid_hex] logging.debug( - f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}") + f"Fetching issuer from dcl query{' '.join(query_cmd_list)}") + response = self.get_dcld_cmd_output_json(query_cmd_list) + return self.get_only_approved_certificate(response, skid_hex) - if self.use_rest: - response = requests.get( - f"{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}").json() - else: - response = self.get_dcld_cmd_output_json( - ['query', 'pki', 'x509-cert', '-u', issuer_name_b64, '-k', akid_hex]) - issuer_certificate = response["approvedCertificates"]["certs"][0]["pemCert"] +class RESTDCLDClient(DCLDClientInterface): + ''' + A client for interacting with DCLD using the REST API. + ''' - logging.debug(f"issuer: {issuer_certificate}") + def __init__(self, rest_node_url: str): + ''' + Initialize the client - try: - issuer_certificate_object = x509.load_pem_x509_certificate(bytes(issuer_certificate, 'utf-8')) - except Exception: - logging.error('Failed to parse PAA certificate') - return + rest_node_url: str + RESTful API URL + ''' + if not re.match(r"^https://(on|on.test-net)\.dcl\.csa-iot\.(org|org/)$", rest_node_url): + raise ValueError(f"Invalid RESTful API URL: {rest_node_url}") - return issuer_certificate_object + self.rest_node_url = rest_node_url + + def get_revocation_points(self) -> list[dict]: + ''' + Get revocation points from DCL + + Returns + ------- + list[dict] + List of revocation points + ''' + + response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points") + return response["PkiRevocationDistributionPoint"] def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: ''' @@ -339,14 +429,20 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: List of revocation points ''' - if self.use_rest: - response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json() - else: - response = self.get_dcld_cmd_output_json(['query', 'pki', 'revocation-points', - '--issuer-subject-key-id', issuer_subject_key_id]) - + response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}") + logging.debug(f"Response revocation points: {response}") return response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"] + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: + ''' + Get certificate from DCL + ''' + subject_name_b64 = self.get_b64_name(subject_name) + skid_hex_formatted = ':'.join([skid_hex[i:i+2] for i in range(0, len(skid_hex), 2)]) + logging.debug( + f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{subject_name_b64}/{skid_hex_formatted}") + response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/certificates/{subject_name_b64}/{skid_hex_formatted}") + return self.get_only_approved_certificate(response, skid_hex) @click.command() @click.help_option('-h', '--help') @@ -383,7 +479,10 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST - dcld_client = DCLDClient(use_rest, dcld, production, rest_node_url) + if use_rest: + dcld_client = RESTDCLDClient(rest_node_url) + else: + dcld_client = DCLDClient(dcld, production, rest_node_url) revocation_point_list = dcld_client.get_revocation_points() @@ -418,12 +517,12 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool continue # 5. Validate the certification path containing CRLSignerCertificate. - paa_certificate_object = dcld_client.get_issuer_cert(crl_signer_certificate) + paa_certificate_object = dcld_client.get_paa_cert(crl_signer_certificate) if paa_certificate_object is None: logging.warning("PAA Certificate not found, continue...") continue - if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False: + if not validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object): logging.warning("Failed to validate CRL Signer Certificate chain, continue...") continue @@ -448,8 +547,9 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool if count_with_matching_vid_issuer_skid > 1: try: - issuing_distribution_point = crl_file.extensions.get_extension_for_oid( - x509.OID_ISSUING_DISTRIBUTION_POINT).value + issuing_distribution_point = crl_file.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.ISSUING_DISTRIBUTION_POINT + ).value except Exception: logging.warning("CRL Issuing Distribution Point not found, continue...") continue @@ -460,26 +560,26 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...") continue else: - logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...") + logging.warning("Distribution Point does not contain a single URI, continue...") continue # TODO: 8. Validate CRL as per Section 6.3 of RFC 5280 - # 9. decide on certificate authority name and AKID + # 9. Decide on certificate authority to match against CRL entries. if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate): - certificate_authority_name_b64 = get_subject_b64(paa_certificate_object) - certificate_akid = get_skid(paa_certificate_object) + certificate_authority_name = crl_signer_certificate.issuer + certificate_akid = get_akid(crl_signer_certificate) elif crl_signer_delegator_cert: - certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert) + certificate_authority_name = crl_signer_delegator_cert.subject certificate_akid = get_skid(crl_signer_delegator_cert) else: - certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate) + certificate_authority_name = crl_signer_certificate.subject certificate_akid = get_skid(crl_signer_certificate) # validate issuer skid matchces with the one in revocation points certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid) - logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}") + logging.debug(f"Certificate Authority Name: {certificate_authority_name.rfc4514_string()}") logging.debug(f"Certificate AKID: {certificate_akid_hex}") logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}") @@ -492,12 +592,11 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool for revoked_cert in crl_file: try: revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid( - x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value - + x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName)[0] if revoked_cert_issuer is not None: # check if this really are the same thing - if revoked_cert_issuer != certificate_authority_name_b64: - logging.warning("CRL Issuer is not CRL File Issuer, continue...") + if revoked_cert_issuer != x509.DirectoryName(certificate_authority_name).value: + logging.warning("CRL entry issuer is not CRL File Issuer, continue...") continue except Exception: logging.warning("certificateIssuer entry extension not found in CRL") @@ -508,7 +607,7 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool entry = { "type": "revocation_set", "issuer_subject_key_id": certificate_akid_hex, - "issuer_name": certificate_authority_name_b64, + "issuer_name": certificate_authority_name.rfc4514_string(), "revoked_serial_numbers": serialnumber_list, "crl_signer_cert": revocation_point["crlSignerCertificate"], }