From 06f297e32ec39f4d62ade768a8629ece7043d4fc Mon Sep 17 00:00:00 2001 From: Albin Antony Date: Tue, 17 Dec 2024 15:54:01 +0530 Subject: [PATCH] Fix L3-iGrant/api#798: Support optional keybinding JWT in every presentation (holder/verifier) --- sdjwt/pex.py | 145 +++++++++++++++++++++++++++------------------------ 1 file changed, 77 insertions(+), 68 deletions(-) diff --git a/sdjwt/pex.py b/sdjwt/pex.py index d6ec556..17d0176 100644 --- a/sdjwt/pex.py +++ b/sdjwt/pex.py @@ -325,11 +325,17 @@ def process_jwt(token): # Check if the last part is an optional key binding JWT optional_keybinding = None - if len(parts) > 1 and '.' in parts[-1] and len(parts[-1].split('.')) == 3: - optional_keybinding = parts.pop() # Remove and assign the last part as keybinding + if token.endswith("~"): + optional_keybinding = None + elif len(parts) >= 1: + optional_keybinding = parts[-1] + if len(optional_keybinding.split('.')) != 3: + raise ValueError("Invalid Key binding jwt. Expected 'header.body.signature'.") + else: + optional_keybinding = None # The remaining parts are disclosures - disclosures = parts[:] + disclosures = parts[:-1] return header, body, signature, disclosures, optional_keybinding @@ -806,75 +812,78 @@ def validate_vp_token( presentation_definition: dict, ) -> bool: headers, claims = decode_header_and_claims_in_jwt(vp_token) - for descriptor in presentation_submission.get("descriptor_map"): - is_verified = False - if "path_nested" in descriptor: - format = descriptor["format"] - path = descriptor["path_nested"]["path"] - id = descriptor["id"] - # Parse the JSON data - jsonpath_expr = parse(path) - matches = jsonpath_expr.find(claims) - - # Extract the value - vc_token = matches[0].value if matches else None - - headers_encoded, claims_encoded, signature, disclosures, key_binding_jwt = process_jwt(vc_token) - - _ , vc_claims = decode_header_and_claims(headers_encoded,claims_encoded) - - - if vc_claims and format == "vc+sd-jwt": - if key_binding_jwt: - disclosures = "~".join(disclosures) - temp_vc_token = f"{headers_encoded}.{claims_encoded}.{signature}" - if disclosures: - temp_vc_token += f"~{disclosures}" - else: - temp_vc_token = vc_token - disclosure_mapping = get_all_disclosures_with_sd_from_token(temp_vc_token) - - credential_subject = create_credential_subject_for_sdjwt( - credential_subject=vc_claims, - disclosure_mapping=disclosure_mapping, - ) - vc_claims = credential_subject - elif vc_claims and format == "jwt_vc": - pass + try : + for descriptor in presentation_submission.get("descriptor_map"): + is_verified = False + if "path_nested" in descriptor: + format = descriptor["format"] + path = descriptor["path_nested"]["path"] + id = descriptor["id"] + # Parse the JSON data + jsonpath_expr = parse(path) + matches = jsonpath_expr.find(claims) + + # Extract the value + vc_token = matches[0].value if matches else None + + headers_encoded, claims_encoded, signature, disclosures, key_binding_jwt = process_jwt(vc_token) + + _ , vc_claims = decode_header_and_claims(headers_encoded,claims_encoded) + + + if vc_claims and format == "vc+sd-jwt": + if key_binding_jwt: + disclosures = "~".join(disclosures) + temp_vc_token = f"{headers_encoded}.{claims_encoded}.{signature}" + if disclosures: + temp_vc_token += f"~{disclosures}" + else: + temp_vc_token = vc_token + disclosure_mapping = get_all_disclosures_with_sd_from_token(temp_vc_token) - input_descriptors = json.loads(presentation_definition).get( - "input_descriptors" - ) - format_info = json.loads(presentation_definition).get("format") - for input_descriptor in input_descriptors: - credential_format = input_descriptor.get("format") - if not credential_format: - credential_format = format_info - if input_descriptor.get("id") == id: - limit_disclosure = input_descriptor.get("constraints").get( - "limit_disclosure", None + credential_subject = create_credential_subject_for_sdjwt( + credential_subject=vc_claims, + disclosure_mapping=disclosure_mapping, ) - if "vc+sd-jwt" in credential_format: - is_valid, err = validate_vc_token_for_sd_jwt( - json.dumps(input_descriptor), - credential_token=temp_vc_token, - limit_disclosure=True if limit_disclosure else False, - ) - if is_valid: - is_verified = True - else: - return False - else: - matches = match_credentials( - json.dumps(input_descriptor), - credentials=[json.dumps(vc_claims["vc"])], + vc_claims = credential_subject + elif vc_claims and format == "jwt_vc": + pass + + input_descriptors = json.loads(presentation_definition).get( + "input_descriptors" + ) + format_info = json.loads(presentation_definition).get("format") + for input_descriptor in input_descriptors: + credential_format = input_descriptor.get("format") + if not credential_format: + credential_format = format_info + if input_descriptor.get("id") == id: + limit_disclosure = input_descriptor.get("constraints").get( + "limit_disclosure", None ) - if not matches or not matches[0]: - return False + if "vc+sd-jwt" in credential_format: + is_valid, err = validate_vc_token_for_sd_jwt( + json.dumps(input_descriptor), + credential_token=temp_vc_token, + limit_disclosure=True if limit_disclosure else False, + ) + if is_valid: + is_verified = True + else: + return False else: - is_verified = True - if not is_verified: - return False + matches = match_credentials( + json.dumps(input_descriptor), + credentials=[json.dumps(vc_claims["vc"])], + ) + if not matches or not matches[0]: + return False + else: + is_verified = True + if not is_verified: + return False + except ValueError as e: + return False return True