diff --git a/TpmTestingPkg/TpmReplayPei/Tool/TpmReplay.py b/TpmTestingPkg/TpmReplayPei/Tool/TpmReplay.py index 36f855a0ef..d49fa880a8 100644 --- a/TpmTestingPkg/TpmReplayPei/Tool/TpmReplay.py +++ b/TpmTestingPkg/TpmReplayPei/Tool/TpmReplay.py @@ -23,7 +23,6 @@ import timeit import yaml -from edk2toollib.tpm.tpm2_defs import TPM_ALG_SHA256 from tcg_platform import ( PCR_0, PCR_7, @@ -37,11 +36,31 @@ VALUE_FROM_EVENT, ALG_FROM_VALUE, EVENT_FROM_VALUE, + EV_EFI_VARIABLE_DRIVER_CONFIG, + EV_EFI_VARIABLE_BOOT, + EV_EFI_VARIABLE_BOOT2, + EV_EFI_VARIABLE_AUTHORITY ) +from edk2toollib.tpm.tpm2_defs import ( + TPM_ALG_SHA256 +) + +from edk2toollib.uefi.authenticated_variables_structure_support import ( + EfiSignatureDatabase, + EfiSignatureDataFactory, + EfiSignatureDataEfiCertX509, +) + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes + +from io import BytesIO + from enum import IntEnum from pathlib import PurePath from typing import Dict, Iterable, List, Union +from collections import defaultdict PROGRAM_NAME = "TPM Replay" @@ -579,6 +598,113 @@ def _add_digest(pcr: int, alg: int, digest: bytes, prehashed: bool = False) -> N return replay_event_log +def _decode_efi_signature_data_x509(data: bytes) -> Dict[str: Dict[str, str, str]]: + """Decodes an EFI signature data containing an X.509 certificate. + + Args: + data (bytes): The DER-encoded X.509 certificate data. + + Returns: + Dict[str: Dict[str, str, str]]: A dictionary containing the certificate's + subject, fingerprint, issuer, and not valid after date. + """ + cert = x509.load_der_x509_certificate(data) + cert_info = { + str(cert.subject): { + "fingerprint": cert.fingerprint(hashes.SHA1()).hex(":"), + "issuer": str(cert.issuer), + "not_valid_after_utc": str(cert.not_valid_after_utc), + } + } + + return cert_info + +def _decode_efi_signature_data_sha256(signature: EfiSignatureDatabase) -> Dict[str: str]: + """Decodes an EFI signature data containing a SHA256 hash. + + Args: + signature (EfiSignatureData): The EFI signature data. + + Returns: + Dict[str: str]: A dictionary containing the SHA256 hash and the signature owner. + """ + return {signature.signature_data.hex(): str(signature.signature_owner)} + +def _decode_variable_driver_config(data: bytes) -> Dict[str: Dict]: + """Decodes an EFI variable boot event. + + Args: + data (bytes): The binary data of the boot event. + + Returns: + Dict[str: Dict]: A dictionary containing the boot variable data. + """ + driver_config = {} + + with BytesIO(data) as fs: + + for signature_list in EfiSignatureDatabase(filestream=fs).EslList: + if not signature_list.signature_data_list: # discard empty EfiSignatureLists + continue + if signature_list.signature_type == EfiSignatureDataFactory.EFI_CERT_SHA256_GUID: + for signature in signature_list.signature_data_list: + driver_config.update(_decode_efi_signature_data_sha256(signature)) + elif signature_list.signature_type == EfiSignatureDataFactory.EFI_CERT_X509_GUID: + for sig_list in signature_list.signature_data_list: + driver_config.update( + _decode_efi_signature_data_x509(sig_list.signature_data)) + else: + raise ValueError(f"Unsupported signature type {signature_list.signature_type}") + + return driver_config + +def _decode_variable_authority(data: bytes) -> Dict[str: Dict]: + """Decodes an EFI variable authority event. + + Args: + data (bytes): The binary data of the authority event. + + Returns: + Dict[str: Dict]: A dictionary containing the authority variable data. + """ + authority_data = {} + + with BytesIO(data) as fs: + sig_list = EfiSignatureDataEfiCertX509( + decodefs=fs, decodesize=len(data)) + authority_data.update( + _decode_efi_signature_data_x509(sig_list.signature_data)) + + return authority_data + +def _decode_is_enabled(data) -> Dict[str, str]: + """ + Decodes an driver config event for the variable SecureBoot. + + Args: + data (bytes): The binary data of the driver config event. + + Returns: + Dict[str: str] : "True" if enabled "False" is disabled + """ + return {"enabled": "True" if ord(data) == 1 else "False" } + +def _decode_value_dispatch(event_type, name, data) -> Dict: + + dispatch_map = defaultdict(lambda: lambda data: {}, { + ("SecureBoot", EV_EFI_VARIABLE_DRIVER_CONFIG): _decode_is_enabled, + ("PK", EV_EFI_VARIABLE_DRIVER_CONFIG): _decode_variable_driver_config, + ("KEK", EV_EFI_VARIABLE_DRIVER_CONFIG): _decode_variable_driver_config, + ("db", EV_EFI_VARIABLE_DRIVER_CONFIG): _decode_variable_driver_config, + ("dbx", EV_EFI_VARIABLE_DRIVER_CONFIG): _decode_variable_driver_config, + ("PK", EV_EFI_VARIABLE_AUTHORITY): _decode_variable_authority, + ("KEK", EV_EFI_VARIABLE_AUTHORITY): _decode_variable_authority, + ("db", EV_EFI_VARIABLE_AUTHORITY): _decode_variable_authority + }) + func = dispatch_map[(name, event_type)] + + return func(data) + def _build_yaml_from_event_log( event_log: TpmReplayEventLog, @@ -593,8 +719,6 @@ def _build_yaml_from_event_log( a string or additional nested dictionaries. """ - import tcg_platform - yaml_data = {"events": []} logger.debug("Processing events...") @@ -615,10 +739,10 @@ def _build_yaml_from_event_log( event_data["data"] = {} if event.event_type in ( - tcg_platform.EV_EFI_VARIABLE_DRIVER_CONFIG, - tcg_platform.EV_EFI_VARIABLE_BOOT, - tcg_platform.EV_EFI_VARIABLE_BOOT2, - tcg_platform.EV_EFI_VARIABLE_AUTHORITY, + EV_EFI_VARIABLE_DRIVER_CONFIG, + EV_EFI_VARIABLE_BOOT, + EV_EFI_VARIABLE_BOOT2, + EV_EFI_VARIABLE_AUTHORITY, ): event_data["data"]["type"] = "variable" tcg_var = TcgUefiVariableData.from_binary(event.event) @@ -628,6 +752,8 @@ def _build_yaml_from_event_log( ] = tcg_var.unicode_name_length event_data["data"]["variable_data_length"] = tcg_var.variable_data_length event_data["data"]["variable_unicode_name"] = tcg_var.unicode_name + event_data["data"]["decode"] = _decode_value_dispatch( + event.event_type, tcg_var.unicode_name, tcg_var.variable_data) event_data["data"]["value"] = base64.b64encode( tcg_var.variable_data ).decode("utf-8")