Skip to content

Commit

Permalink
Fix L3-iGrant/api#689: Update to presentation submission for holder a…
Browse files Browse the repository at this point in the history
…nd submission evaluation for the verifier
  • Loading branch information
albinpa authored and georgepadayatti committed Nov 28, 2024
1 parent f279242 commit 2980faa
Showing 1 changed file with 274 additions and 8 deletions.
282 changes: 274 additions & 8 deletions sdjwt/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def match_credentials_for_sd_jwt(
credential_subject, key_mapping = (
decode_credential_sd_to_credential_subject_with_key_mapping(
disclosure_mapping=disclosure_mapping,
credential_subject=credential_decoded
credential_subject=credential_decoded,
)
)
credential = credential_decoded
Expand Down Expand Up @@ -802,23 +802,29 @@ def validate_vp_token(
"limit_disclosure", None
)
if "vc+sd-jwt" in credential_format:
matches = match_credentials(
is_valid, err = validate_vc_token_for_sd_jwt(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims)],
credential_token=vc_token,
limit_disclosure=True if limit_disclosure else False,
)
if is_valid:
is_verified = True
else:
return False
else:
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims["vc"])],
)
if not matches or not matches[0]:
return False
else:
is_verified = True
if not matches or not matches[0]:
return False
else:
is_verified = True
if not is_verified:
return False
return True


def decode_credential_sd_to_credential_claims(
disclosure_mapping: dict, credential_subject: dict
) -> dict:
Expand All @@ -830,7 +836,9 @@ def replace_sd_with_credential_subject_attributes(sds: list, disclosure: dict):
for sd in sds:
disclosure_base64 = disclosure.get(sd)
if disclosure_base64:
key, value = decode_disclosure_base64(disclosure_base64=disclosure_base64)
key, value = decode_disclosure_base64(
disclosure_base64=disclosure_base64
)
credential_attribute[key] = value
return credential_attribute

Expand Down Expand Up @@ -866,3 +874,261 @@ def iterate_mapping(obj, path):

iterate_mapping(credential_subject, [])
return credential_subject["credentialSubject"]


def remove_and_add_matching_disclosures_to_token(
token: str, disclosure_mapping: dict
) -> str:
token_without_disclosures = token.split("~")[0]
disclosures = []
for sd, disclosure in disclosure_mapping.items():
disclosures.append(disclosure)
if len(disclosures) > 0:
token_with_disclosures = token_without_disclosures + "~" + "~".join(disclosures)
else:
token_with_disclosures = token_without_disclosures

return token_with_disclosures


def apply_json_path_for_sd_jwt(input_json_string, path, disclosure_mapping):
is_matched = False
try:
# Parse input JSON string
parsed_input = json.loads(input_json_string)
except json.JSONDecodeError as e:
return None, e, None

try:
# Parse JSON path string
jsonpath_expr = parse(path)
except Exception as e:
return None, e, None

disclosures = {}

for match in jsonpath_expr.find(parsed_input):
matches = match.value

if isinstance(matches, dict):
child_sd = matches.get("_sd")
if child_sd:
for sd in child_sd:
disclosures[sd] = disclosure_mapping.get(sd)
return True, disclosures, None, matches

path_parts = path.split(".")
length_of_path_parts = len(path_parts)
poped_parts = []
matches = None
while len(path_parts) >= (length_of_path_parts - 1) and len(path_parts) > 1:
poped_part = path_parts.pop() # Remove the last segment
parent_path = ".".join(path_parts)
parent_expr = parse(parent_path)
for match in parent_expr.find(parsed_input):
if isinstance(match.value, dict):
sds = match.value.get("_sd")
if sds:
for sd in sds:
disclosure_base64 = disclosure_mapping.get(sd)

key, value = decode_disclosure_base64(
disclosure_base64=disclosure_base64
)
if key == poped_part:
if len(poped_parts) > 0:
disclosure_value = None
if isinstance(value, dict):
for pp in poped_parts:
disclosure_value = value.get(pp)
if disclosure_value:
break
if disclosure_value:
is_matched = True
matches = disclosure_value
disclosures[sd] = disclosure_mapping.get(sd)
else:
is_matched = True
matches = value
disclosures[sd] = disclosure_mapping.get(sd)
poped_parts.append(poped_part)
return is_matched, disclosures, None, matches


def match_credentials_for_ietf_sd_jwt(
input_descriptor_json,
credentials,
limit_disclosure: bool,
):
# Deserialise input descriptor json string
try:
descriptor = json.loads(input_descriptor_json)
except json.JSONDecodeError as e:
return [], e

# To store the matched credentials
matched_credentials = []

# Iterate through each credential
for item in credentials:
for credential_id, credential_token in item.items():
disclosure_mapping = get_all_disclosures_with_sd_from_token(
token=credential_token
)
_, credential_decoded = decode_header_and_claims_in_jwt(credential_token)
credential = json.dumps(credential_decoded)
credential_matched = True
matched_disclosure_mapping = {}
# Iterate through fields specified in the constraints
for field_index, field in enumerate(descriptor["constraints"]["fields"]):

for path_index, path in enumerate(field["path"]):
is_matched, disclosures, err, matches = apply_json_path_for_sd_jwt(
credential, path, disclosure_mapping
)
if is_matched:
if "filter" in field:
try:
filter_bytes = json.dumps(field["filter"])
except (TypeError, ValueError) as e:
# Continue to next path, since filter has failed to serialise
continue

# Validate the matched JSON against the field's filter

if validate_json_schema(matches, filter_bytes) is not None:
credential_matched = False
break
matched_disclosure_mapping.update(disclosures)

if not is_matched:
credential_matched = False
break
if credential_matched:
if limit_disclosure:
token = remove_and_add_matching_disclosures_to_token(
token=credential_token,
disclosure_mapping=matched_disclosure_mapping,
)
matched_credential = {credential_id: token}
matched_credentials.append(matched_credential)
else:
matched_credential = {credential_id: credential_token}
matched_credentials.append(matched_credential)

return matched_credentials


def apply_json_path_for_validating_sd_jwt(input_json_string, path, disclosure_mapping):
is_matched = False
try:
# Parse input JSON string
parsed_input = json.loads(input_json_string)
except json.JSONDecodeError as e:
return None, e, None

try:
# Parse JSON path string
jsonpath_expr = parse(path)
except Exception as e:
return None, e, None

for match in jsonpath_expr.find(parsed_input):
matches = match.value

if isinstance(matches, dict):
child_sd = matches.get("_sd")
if child_sd:
for sd in child_sd:
disclosure = disclosure_mapping.get(sd)
if not disclosure:
return False, None, matches
return True, None, matches

path_parts = path.split(".")
length_of_path_parts = len(path_parts)
poped_parts = []
matches = None
while len(path_parts) >= (length_of_path_parts - 1) and len(path_parts) > 1:
poped_part = path_parts.pop() # Remove the last segment
parent_path = ".".join(path_parts)
parent_expr = parse(parent_path)
for match in parent_expr.find(parsed_input):
if isinstance(match.value, dict):
sds = match.value.get("_sd")
if sds:
for sd in sds:
disclosure_base64 = disclosure_mapping.get(sd)
if disclosure_base64:
key, value = decode_disclosure_base64(
disclosure_base64=disclosure_base64
)
if key == poped_part:
if len(poped_parts) > 0:
disclosure_value = None
if isinstance(value, dict):
for pp in poped_parts:
disclosure_value = value.get(pp)
if disclosure_value:
break
if disclosure_value:
is_matched = True
matches = disclosure_value
else:
is_matched = True
matches = value
poped_parts.append(poped_part)
return is_matched, None, matches


def validate_vc_token_for_sd_jwt(
input_descriptor_json,
credential_token: str,
limit_disclosure: bool,
):
# Deserialise input descriptor json string
try:
descriptor = json.loads(input_descriptor_json)
except json.JSONDecodeError as e:
return False, e

disclosure_mapping = get_all_disclosures_with_sd_from_token(token=credential_token)
_, credential_decoded = decode_header_and_claims_in_jwt(credential_token)
credential = json.dumps(credential_decoded)
credential_matched = True
# Iterate through fields specified in the constraints
for field_index, field in enumerate(descriptor["constraints"]["fields"]):

for path_index, path in enumerate(field["path"]):
is_matched, err, matches = apply_json_path_for_validating_sd_jwt(
credential, path, disclosure_mapping
)
if is_matched:
if "filter" in field:
try:
filter_bytes = json.dumps(field["filter"])
except (TypeError, ValueError) as e:
# Continue to next path, since filter has failed to serialise
continue

# Validate the matched JSON against the field's filter

if validate_json_schema(matches, filter_bytes) is not None:
credential_matched = False
break

if not is_matched:
credential_matched = False
break
if credential_matched:
if limit_disclosure:
return True, None
else:
sds = find_all_sd_values(credential_decoded)
for sd in sds:
disclosure_base64 = disclosure_mapping.get(sd)
if not disclosure_base64:
return False, None
return True, None
else:
return False, None

0 comments on commit 2980faa

Please sign in to comment.