From 2980faad0706f93bc801725b696c0957d640c24d Mon Sep 17 00:00:00 2001 From: Albin Antony Date: Thu, 28 Nov 2024 13:03:59 +0530 Subject: [PATCH] Fix L3-iGrant/api#689: Update to presentation submission for holder and submission evaluation for the verifier --- sdjwt/pex.py | 282 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 274 insertions(+), 8 deletions(-) diff --git a/sdjwt/pex.py b/sdjwt/pex.py index fd06668..5677174 100644 --- a/sdjwt/pex.py +++ b/sdjwt/pex.py @@ -644,7 +644,7 @@ def match_credentials_for_sd_jwt( credential_subject, key_mapping = ( decode_credential_sd_to_credential_subject_with_key_mapping( disclosure_mapping=disclosure_mapping, - credential_subject=credential_decoded + credential_subject=credential_decoded, ) ) credential = credential_decoded @@ -802,23 +802,29 @@ def validate_vp_token( "limit_disclosure", None ) if "vc+sd-jwt" in credential_format: - matches = match_credentials( + is_valid, err = validate_vc_token_for_sd_jwt( json.dumps(input_descriptor), - credentials=[json.dumps(vc_claims)], + credential_token=vc_token, + limit_disclosure=True if limit_disclosure else False, ) + if is_valid: + is_verified = True + else: + return False else: matches = match_credentials( json.dumps(input_descriptor), credentials=[json.dumps(vc_claims["vc"])], ) - if not matches or not matches[0]: - return False - else: - is_verified = True + if not matches or not matches[0]: + return False + else: + is_verified = True if not is_verified: return False return True + def decode_credential_sd_to_credential_claims( disclosure_mapping: dict, credential_subject: dict ) -> dict: @@ -830,7 +836,9 @@ def replace_sd_with_credential_subject_attributes(sds: list, disclosure: dict): for sd in sds: disclosure_base64 = disclosure.get(sd) if disclosure_base64: - key, value = decode_disclosure_base64(disclosure_base64=disclosure_base64) + key, value = decode_disclosure_base64( + disclosure_base64=disclosure_base64 + ) credential_attribute[key] = value return credential_attribute @@ -866,3 +874,261 @@ def iterate_mapping(obj, path): iterate_mapping(credential_subject, []) return credential_subject["credentialSubject"] + + +def remove_and_add_matching_disclosures_to_token( + token: str, disclosure_mapping: dict +) -> str: + token_without_disclosures = token.split("~")[0] + disclosures = [] + for sd, disclosure in disclosure_mapping.items(): + disclosures.append(disclosure) + if len(disclosures) > 0: + token_with_disclosures = token_without_disclosures + "~" + "~".join(disclosures) + else: + token_with_disclosures = token_without_disclosures + + return token_with_disclosures + + +def apply_json_path_for_sd_jwt(input_json_string, path, disclosure_mapping): + is_matched = False + try: + # Parse input JSON string + parsed_input = json.loads(input_json_string) + except json.JSONDecodeError as e: + return None, e, None + + try: + # Parse JSON path string + jsonpath_expr = parse(path) + except Exception as e: + return None, e, None + + disclosures = {} + + for match in jsonpath_expr.find(parsed_input): + matches = match.value + + if isinstance(matches, dict): + child_sd = matches.get("_sd") + if child_sd: + for sd in child_sd: + disclosures[sd] = disclosure_mapping.get(sd) + return True, disclosures, None, matches + + path_parts = path.split(".") + length_of_path_parts = len(path_parts) + poped_parts = [] + matches = None + while len(path_parts) >= (length_of_path_parts - 1) and len(path_parts) > 1: + poped_part = path_parts.pop() # Remove the last segment + parent_path = ".".join(path_parts) + parent_expr = parse(parent_path) + for match in parent_expr.find(parsed_input): + if isinstance(match.value, dict): + sds = match.value.get("_sd") + if sds: + for sd in sds: + disclosure_base64 = disclosure_mapping.get(sd) + + key, value = decode_disclosure_base64( + disclosure_base64=disclosure_base64 + ) + if key == poped_part: + if len(poped_parts) > 0: + disclosure_value = None + if isinstance(value, dict): + for pp in poped_parts: + disclosure_value = value.get(pp) + if disclosure_value: + break + if disclosure_value: + is_matched = True + matches = disclosure_value + disclosures[sd] = disclosure_mapping.get(sd) + else: + is_matched = True + matches = value + disclosures[sd] = disclosure_mapping.get(sd) + poped_parts.append(poped_part) + return is_matched, disclosures, None, matches + + +def match_credentials_for_ietf_sd_jwt( + input_descriptor_json, + credentials, + limit_disclosure: bool, +): + # Deserialise input descriptor json string + try: + descriptor = json.loads(input_descriptor_json) + except json.JSONDecodeError as e: + return [], e + + # To store the matched credentials + matched_credentials = [] + + # Iterate through each credential + for item in credentials: + for credential_id, credential_token in item.items(): + disclosure_mapping = get_all_disclosures_with_sd_from_token( + token=credential_token + ) + _, credential_decoded = decode_header_and_claims_in_jwt(credential_token) + credential = json.dumps(credential_decoded) + credential_matched = True + matched_disclosure_mapping = {} + # Iterate through fields specified in the constraints + for field_index, field in enumerate(descriptor["constraints"]["fields"]): + + for path_index, path in enumerate(field["path"]): + is_matched, disclosures, err, matches = apply_json_path_for_sd_jwt( + credential, path, disclosure_mapping + ) + if is_matched: + if "filter" in field: + try: + filter_bytes = json.dumps(field["filter"]) + except (TypeError, ValueError) as e: + # Continue to next path, since filter has failed to serialise + continue + + # Validate the matched JSON against the field's filter + + if validate_json_schema(matches, filter_bytes) is not None: + credential_matched = False + break + matched_disclosure_mapping.update(disclosures) + + if not is_matched: + credential_matched = False + break + if credential_matched: + if limit_disclosure: + token = remove_and_add_matching_disclosures_to_token( + token=credential_token, + disclosure_mapping=matched_disclosure_mapping, + ) + matched_credential = {credential_id: token} + matched_credentials.append(matched_credential) + else: + matched_credential = {credential_id: credential_token} + matched_credentials.append(matched_credential) + + return matched_credentials + + +def apply_json_path_for_validating_sd_jwt(input_json_string, path, disclosure_mapping): + is_matched = False + try: + # Parse input JSON string + parsed_input = json.loads(input_json_string) + except json.JSONDecodeError as e: + return None, e, None + + try: + # Parse JSON path string + jsonpath_expr = parse(path) + except Exception as e: + return None, e, None + + for match in jsonpath_expr.find(parsed_input): + matches = match.value + + if isinstance(matches, dict): + child_sd = matches.get("_sd") + if child_sd: + for sd in child_sd: + disclosure = disclosure_mapping.get(sd) + if not disclosure: + return False, None, matches + return True, None, matches + + path_parts = path.split(".") + length_of_path_parts = len(path_parts) + poped_parts = [] + matches = None + while len(path_parts) >= (length_of_path_parts - 1) and len(path_parts) > 1: + poped_part = path_parts.pop() # Remove the last segment + parent_path = ".".join(path_parts) + parent_expr = parse(parent_path) + for match in parent_expr.find(parsed_input): + if isinstance(match.value, dict): + sds = match.value.get("_sd") + if sds: + for sd in sds: + disclosure_base64 = disclosure_mapping.get(sd) + if disclosure_base64: + key, value = decode_disclosure_base64( + disclosure_base64=disclosure_base64 + ) + if key == poped_part: + if len(poped_parts) > 0: + disclosure_value = None + if isinstance(value, dict): + for pp in poped_parts: + disclosure_value = value.get(pp) + if disclosure_value: + break + if disclosure_value: + is_matched = True + matches = disclosure_value + else: + is_matched = True + matches = value + poped_parts.append(poped_part) + return is_matched, None, matches + + +def validate_vc_token_for_sd_jwt( + input_descriptor_json, + credential_token: str, + limit_disclosure: bool, +): + # Deserialise input descriptor json string + try: + descriptor = json.loads(input_descriptor_json) + except json.JSONDecodeError as e: + return False, e + + disclosure_mapping = get_all_disclosures_with_sd_from_token(token=credential_token) + _, credential_decoded = decode_header_and_claims_in_jwt(credential_token) + credential = json.dumps(credential_decoded) + credential_matched = True + # Iterate through fields specified in the constraints + for field_index, field in enumerate(descriptor["constraints"]["fields"]): + + for path_index, path in enumerate(field["path"]): + is_matched, err, matches = apply_json_path_for_validating_sd_jwt( + credential, path, disclosure_mapping + ) + if is_matched: + if "filter" in field: + try: + filter_bytes = json.dumps(field["filter"]) + except (TypeError, ValueError) as e: + # Continue to next path, since filter has failed to serialise + continue + + # Validate the matched JSON against the field's filter + + if validate_json_schema(matches, filter_bytes) is not None: + credential_matched = False + break + + if not is_matched: + credential_matched = False + break + if credential_matched: + if limit_disclosure: + return True, None + else: + sds = find_all_sd_values(credential_decoded) + for sd in sds: + disclosure_base64 = disclosure_mapping.get(sd) + if not disclosure_base64: + return False, None + return True, None + else: + return False, None