Skip to content

Commit

Permalink
Fix L3-iGrant/api#798: Support optional keybinding JWT in every prese…
Browse files Browse the repository at this point in the history
…ntation (holder/verifier)
  • Loading branch information
albinpa authored and georgepadayatti committed Dec 17, 2024
1 parent f496f58 commit 06f297e
Showing 1 changed file with 77 additions and 68 deletions.
145 changes: 77 additions & 68 deletions sdjwt/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,17 @@ def process_jwt(token):

# Check if the last part is an optional key binding JWT
optional_keybinding = None
if len(parts) > 1 and '.' in parts[-1] and len(parts[-1].split('.')) == 3:
optional_keybinding = parts.pop() # Remove and assign the last part as keybinding
if token.endswith("~"):
optional_keybinding = None
elif len(parts) >= 1:
optional_keybinding = parts[-1]
if len(optional_keybinding.split('.')) != 3:
raise ValueError("Invalid Key binding jwt. Expected 'header.body.signature'.")
else:
optional_keybinding = None

# The remaining parts are disclosures
disclosures = parts[:]
disclosures = parts[:-1]

return header, body, signature, disclosures, optional_keybinding

Expand Down Expand Up @@ -806,75 +812,78 @@ def validate_vp_token(
presentation_definition: dict,
) -> bool:
headers, claims = decode_header_and_claims_in_jwt(vp_token)
for descriptor in presentation_submission.get("descriptor_map"):
is_verified = False
if "path_nested" in descriptor:
format = descriptor["format"]
path = descriptor["path_nested"]["path"]
id = descriptor["id"]
# Parse the JSON data
jsonpath_expr = parse(path)
matches = jsonpath_expr.find(claims)

# Extract the value
vc_token = matches[0].value if matches else None

headers_encoded, claims_encoded, signature, disclosures, key_binding_jwt = process_jwt(vc_token)

_ , vc_claims = decode_header_and_claims(headers_encoded,claims_encoded)


if vc_claims and format == "vc+sd-jwt":
if key_binding_jwt:
disclosures = "~".join(disclosures)
temp_vc_token = f"{headers_encoded}.{claims_encoded}.{signature}"
if disclosures:
temp_vc_token += f"~{disclosures}"
else:
temp_vc_token = vc_token
disclosure_mapping = get_all_disclosures_with_sd_from_token(temp_vc_token)

credential_subject = create_credential_subject_for_sdjwt(
credential_subject=vc_claims,
disclosure_mapping=disclosure_mapping,
)
vc_claims = credential_subject
elif vc_claims and format == "jwt_vc":
pass
try :
for descriptor in presentation_submission.get("descriptor_map"):
is_verified = False
if "path_nested" in descriptor:
format = descriptor["format"]
path = descriptor["path_nested"]["path"]
id = descriptor["id"]
# Parse the JSON data
jsonpath_expr = parse(path)
matches = jsonpath_expr.find(claims)

# Extract the value
vc_token = matches[0].value if matches else None

headers_encoded, claims_encoded, signature, disclosures, key_binding_jwt = process_jwt(vc_token)

_ , vc_claims = decode_header_and_claims(headers_encoded,claims_encoded)


if vc_claims and format == "vc+sd-jwt":
if key_binding_jwt:
disclosures = "~".join(disclosures)
temp_vc_token = f"{headers_encoded}.{claims_encoded}.{signature}"
if disclosures:
temp_vc_token += f"~{disclosures}"
else:
temp_vc_token = vc_token
disclosure_mapping = get_all_disclosures_with_sd_from_token(temp_vc_token)

input_descriptors = json.loads(presentation_definition).get(
"input_descriptors"
)
format_info = json.loads(presentation_definition).get("format")
for input_descriptor in input_descriptors:
credential_format = input_descriptor.get("format")
if not credential_format:
credential_format = format_info
if input_descriptor.get("id") == id:
limit_disclosure = input_descriptor.get("constraints").get(
"limit_disclosure", None
credential_subject = create_credential_subject_for_sdjwt(
credential_subject=vc_claims,
disclosure_mapping=disclosure_mapping,
)
if "vc+sd-jwt" in credential_format:
is_valid, err = validate_vc_token_for_sd_jwt(
json.dumps(input_descriptor),
credential_token=temp_vc_token,
limit_disclosure=True if limit_disclosure else False,
)
if is_valid:
is_verified = True
else:
return False
else:
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims["vc"])],
vc_claims = credential_subject
elif vc_claims and format == "jwt_vc":
pass

input_descriptors = json.loads(presentation_definition).get(
"input_descriptors"
)
format_info = json.loads(presentation_definition).get("format")
for input_descriptor in input_descriptors:
credential_format = input_descriptor.get("format")
if not credential_format:
credential_format = format_info
if input_descriptor.get("id") == id:
limit_disclosure = input_descriptor.get("constraints").get(
"limit_disclosure", None
)
if not matches or not matches[0]:
return False
if "vc+sd-jwt" in credential_format:
is_valid, err = validate_vc_token_for_sd_jwt(
json.dumps(input_descriptor),
credential_token=temp_vc_token,
limit_disclosure=True if limit_disclosure else False,
)
if is_valid:
is_verified = True
else:
return False
else:
is_verified = True
if not is_verified:
return False
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims["vc"])],
)
if not matches or not matches[0]:
return False
else:
is_verified = True
if not is_verified:
return False
except ValueError as e:
return False
return True


Expand Down

0 comments on commit 06f297e

Please sign in to comment.