Skip to content

Commit

Permalink
da_revocation: function out generation of revocation set from the crl
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamdp committed Jan 14, 2025
1 parent 5bd63d5 commit 6280b89
Showing 1 changed file with 96 additions and 43 deletions.
139 changes: 96 additions & 43 deletions credentials/generate-revocation-set.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,97 @@ def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList
logging.error('Failed to fetch a valid CRL')


def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList,
crl_signer_certificate: x509.Certificate,
certificate_authority_name_b64: str,
certificate_akid_hex: str,
crl_signer_delegator_cert: x509.Certificate) -> dict:
"""Generate a revocation set from a CRL file.
Args:
crl_file: The CRL object containing revoked certificates
crl_signer_certificate: The certificate object used to sign the CRL
certificate_authority_name_b64: Base64 encoded issuer name
certificate_akid_hex: Hex encoded Authority Key Identifier
crl_signer_delegator_cert: crl signer delegator certificate object
Returns:
dict: A dictionary containing the revocation set data with fields:
- type: "revocation_set"
- issuer_subject_key_id: Authority Key Identifier (hex)
- issuer_name: Issuer name (base64)
- revoked_serial_numbers: List of revoked serial numbers
- crl_signer_cert: CRL signer certificate (base64 DER)
- crl_signer_delegator: Optional delegator certificate (base64 DER)
"""
serialnumber_list = []

for revoked_cert in crl_file:
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
pass

serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
}

if crl_signer_delegator_cert:
entry["crl_signer_delegator"] = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8')

return entry


# This is implemented as per point (9) in 6.2.4.1. Conceptual algorithm for revocation set construction
def get_certificate_authority_details(crl_signer_certificate: x509.Certificate,
crl_signer_delegator_cert: x509.Certificate,
paa_certificate_object: x509.Certificate,
is_paa: bool) -> tuple[str, str]:
"""Get certificate authority name and AKID based on certificate hierarchy.
Args:
crl_signer_certificate: The CRL signer certificate
crl_signer_delegator_cert: Optional delegator certificate
paa_certificate_object: Optional PAA certificate
is_paa: Whether this is a PAA certificate
Returns:
tuple[str, str]: (certificate_authority_name_b64, certificate_akid_hex)
"""
if is_paa and not is_self_signed_certificate(crl_signer_certificate):
cert_for_details = paa_certificate_object
logging.debug("Using PAA certificate for details")
elif crl_signer_delegator_cert:
cert_for_details = crl_signer_delegator_cert
logging.debug("Using CRL Signer Delegator certificate for details")
else:
cert_for_details = crl_signer_certificate
logging.debug("Using CRL Signer certificate for details")

certificate_authority_name_b64 = get_subject_b64(cert_for_details)
certificate_akid = get_skid(cert_for_details)
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")

return certificate_authority_name_b64, certificate_akid_hex


class DCLDClient:
'''
A client for interacting with DCLD using either the REST API or command line interface (CLI).
Expand Down Expand Up @@ -325,7 +416,7 @@ def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:

return issuer_certificate_object

def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
def get_revocations_points_by_skid(self, issuer_subject_key_id: str) -> list[dict]:
'''
Get revocation points by subject key ID
Expand Down Expand Up @@ -364,7 +455,6 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
help='Determines the verbosity of script output')
def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, output: str, log_level: str):
"""Tool to construct revocation set from DCL"""

logging.basicConfig(
level=log_level,
format='%(asctime)s %(name)s %(levelname)-7s %(message)s',
Expand Down Expand Up @@ -467,56 +557,19 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280

# 9. decide on certificate authority name and AKID
if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate):
certificate_authority_name_b64 = get_subject_b64(paa_certificate_object)
certificate_akid = get_skid(paa_certificate_object)
elif crl_signer_delegator_cert:
certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert)
certificate_akid = get_skid(crl_signer_delegator_cert)
else:
certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate)
certificate_akid = get_skid(crl_signer_certificate)
certificate_authority_name_b64, certificate_akid_hex = get_certificate_authority_details(
crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point["isPAA"])

# validate issuer skid matchces with the one in revocation points
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")
logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}")

if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex:
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
continue

serialnumber_list = []
# 10. Iterate through the Revoked Certificates List
for revoked_cert in crl_file:
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
logging.warning("certificateIssuer entry extension not found in CRL")
pass

serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
}

if crl_signer_delegator_cert:
entry["crl_signer_delegator"] = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate,
certificate_authority_name_b64, certificate_akid_hex, crl_signer_delegator_cert)
logging.debug(f"Entry to append: {entry}")
revocation_set.append(entry)

Expand Down

0 comments on commit 6280b89

Please sign in to comment.