From 6280b897e0da2cf2bf8f92dd1a6f30a9c696529c Mon Sep 17 00:00:00 2001 From: Shubham Patil Date: Mon, 23 Dec 2024 19:03:45 +0530 Subject: [PATCH] da_revocation: function out generation of revocation set from the crl --- credentials/generate-revocation-set.py | 139 +++++++++++++++++-------- 1 file changed, 96 insertions(+), 43 deletions(-) diff --git a/credentials/generate-revocation-set.py b/credentials/generate-revocation-set.py index 509eee56008132..e122bdd27c57cf 100644 --- a/credentials/generate-revocation-set.py +++ b/credentials/generate-revocation-set.py @@ -199,6 +199,97 @@ def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList logging.error('Failed to fetch a valid CRL') +def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList, + crl_signer_certificate: x509.Certificate, + certificate_authority_name_b64: str, + certificate_akid_hex: str, + crl_signer_delegator_cert: x509.Certificate) -> dict: + """Generate a revocation set from a CRL file. + + Args: + crl_file: The CRL object containing revoked certificates + crl_signer_certificate: The certificate object used to sign the CRL + certificate_authority_name_b64: Base64 encoded issuer name + certificate_akid_hex: Hex encoded Authority Key Identifier + crl_signer_delegator_cert: crl signer delegator certificate object + + Returns: + dict: A dictionary containing the revocation set data with fields: + - type: "revocation_set" + - issuer_subject_key_id: Authority Key Identifier (hex) + - issuer_name: Issuer name (base64) + - revoked_serial_numbers: List of revoked serial numbers + - crl_signer_cert: CRL signer certificate (base64 DER) + - crl_signer_delegator: Optional delegator certificate (base64 DER) + """ + serialnumber_list = [] + + for revoked_cert in crl_file: + try: + revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid( + x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value + + if revoked_cert_issuer is not None: + # check if this really are the same thing + if revoked_cert_issuer != certificate_authority_name_b64: + logging.warning("CRL Issuer is not CRL File Issuer, continue...") + continue + except Exception: + pass + + serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8')) + + entry = { + "type": "revocation_set", + "issuer_subject_key_id": certificate_akid_hex, + "issuer_name": certificate_authority_name_b64, + "revoked_serial_numbers": serialnumber_list, + "crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'), + } + + if crl_signer_delegator_cert: + entry["crl_signer_delegator"] = base64.b64encode( + crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8') + + return entry + + +# This is implemented as per point (9) in 6.2.4.1. Conceptual algorithm for revocation set construction +def get_certificate_authority_details(crl_signer_certificate: x509.Certificate, + crl_signer_delegator_cert: x509.Certificate, + paa_certificate_object: x509.Certificate, + is_paa: bool) -> tuple[str, str]: + """Get certificate authority name and AKID based on certificate hierarchy. + + Args: + crl_signer_certificate: The CRL signer certificate + crl_signer_delegator_cert: Optional delegator certificate + paa_certificate_object: Optional PAA certificate + is_paa: Whether this is a PAA certificate + + Returns: + tuple[str, str]: (certificate_authority_name_b64, certificate_akid_hex) + """ + if is_paa and not is_self_signed_certificate(crl_signer_certificate): + cert_for_details = paa_certificate_object + logging.debug("Using PAA certificate for details") + elif crl_signer_delegator_cert: + cert_for_details = crl_signer_delegator_cert + logging.debug("Using CRL Signer Delegator certificate for details") + else: + cert_for_details = crl_signer_certificate + logging.debug("Using CRL Signer certificate for details") + + certificate_authority_name_b64 = get_subject_b64(cert_for_details) + certificate_akid = get_skid(cert_for_details) + certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid) + + logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}") + logging.debug(f"Certificate AKID: {certificate_akid_hex}") + + return certificate_authority_name_b64, certificate_akid_hex + + class DCLDClient: ''' A client for interacting with DCLD using either the REST API or command line interface (CLI). @@ -325,7 +416,7 @@ def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]: return issuer_certificate_object - def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: + def get_revocations_points_by_skid(self, issuer_subject_key_id: str) -> list[dict]: ''' Get revocation points by subject key ID @@ -364,7 +455,6 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: help='Determines the verbosity of script output') def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, output: str, log_level: str): """Tool to construct revocation set from DCL""" - logging.basicConfig( level=log_level, format='%(asctime)s %(name)s %(levelname)-7s %(message)s', @@ -467,56 +557,19 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool # TODO: 8. Validate CRL as per Section 6.3 of RFC 5280 # 9. decide on certificate authority name and AKID - if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate): - certificate_authority_name_b64 = get_subject_b64(paa_certificate_object) - certificate_akid = get_skid(paa_certificate_object) - elif crl_signer_delegator_cert: - certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert) - certificate_akid = get_skid(crl_signer_delegator_cert) - else: - certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate) - certificate_akid = get_skid(crl_signer_certificate) + certificate_authority_name_b64, certificate_akid_hex = get_certificate_authority_details( + crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point["isPAA"]) # validate issuer skid matchces with the one in revocation points - certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid) - - logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}") - logging.debug(f"Certificate AKID: {certificate_akid_hex}") logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}") if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex: logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...") continue - serialnumber_list = [] # 10. Iterate through the Revoked Certificates List - for revoked_cert in crl_file: - try: - revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid( - x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value - - if revoked_cert_issuer is not None: - # check if this really are the same thing - if revoked_cert_issuer != certificate_authority_name_b64: - logging.warning("CRL Issuer is not CRL File Issuer, continue...") - continue - except Exception: - logging.warning("certificateIssuer entry extension not found in CRL") - pass - - serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8')) - - entry = { - "type": "revocation_set", - "issuer_subject_key_id": certificate_akid_hex, - "issuer_name": certificate_authority_name_b64, - "revoked_serial_numbers": serialnumber_list, - "crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'), - } - - if crl_signer_delegator_cert: - entry["crl_signer_delegator"] = base64.b64encode( - crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8'), + entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate, + certificate_authority_name_b64, certificate_akid_hex, crl_signer_delegator_cert) logging.debug(f"Entry to append: {entry}") revocation_set.append(entry)