From 95473efd28d788a39b753c0119cf32e9482f32d3 Mon Sep 17 00:00:00 2001 From: Albin Antony Date: Fri, 24 May 2024 10:28:46 +0530 Subject: [PATCH] Fix #14: Support creating vc jwt with disclosure mapping where disclosure mapping contains limitDisclosure instead of limitedDesclosure --- sdjwt/sdjwt.py | 105 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/sdjwt/sdjwt.py b/sdjwt/sdjwt.py index 0eb5731..4897a84 100644 --- a/sdjwt/sdjwt.py +++ b/sdjwt/sdjwt.py @@ -523,6 +523,111 @@ def iterate_mapping(obj, path): return jwt_credential + sd_disclosures +def create_w3c_vc_jwt_with_disclosure_mapping_v2( + jti: str, + iss: str, + sub: str, + kid: str, + key: jwk.JWK, + credential_issuer: str, + credential_id: str, + credential_type: typing.List[str], + credential_context: typing.List[str], + credential_subject: dict, + credential_schema: typing.Optional[typing.Union[dict, typing.List[dict]]] = None, + credential_status: typing.Optional[dict] = None, + terms_of_use: typing.Optional[typing.Union[dict, typing.List[dict]]] = None, + disclosure_mapping: typing.Optional[dict] = None, +) -> str: + expiry_in_seconds = 3600 + issuance_epoch, issuance_8601 = ( + get_current_datetime_in_epoch_seconds_and_iso8601_format() + ) + expiration_epoch, expiration_8601 = ( + get_current_datetime_in_epoch_seconds_and_iso8601_format(expiry_in_seconds) + ) + _credentialSubject = {**credential_subject} + if disclosure_mapping: + disclosures = [] + + def calculate_sd(name, value): + _sd = [] + disclosure_base64 = create_disclosure_base64( + create_random_salt(32), key=name, value=value + ) + sd = create_sd_from_disclosure_base64(disclosure_base64) + disclosures.append(disclosure_base64) + _sd.append(sd) + return _sd + + def update_value(obj, path): + # Construct json path dot notation + dot_notation_path = ".".join(path) + + # Find matches for the json path + jp = parse(dot_notation_path) + matches = jp.find(obj) + + # Iterate through the matches and calculated sd + for match in matches: + sd = calculate_sd(str(match.path), match.value) + if isinstance(match.context.value, dict): + if not match.context.value.get("_sd"): + match.context.value.setdefault("_sd", sd) + del match.context.value[str(match.path)] + else: + match.context.value["_sd"].extend(sd) + del match.context.value[str(match.path)] + + def iterate_mapping(obj, path): + for key, value in obj.items(): + if isinstance(value, dict): + new_path = path + [f"'{key}'"] + # Check if limitDisclosure is present or not + if "limitDisclosure" in value and value["limitDisclosure"]: + update_value(_credentialSubject, new_path) + iterate_mapping(value, new_path) + + # Iterate through disclosure mapping + # and add sd to the corresponding field in the + # credential subject + iterate_mapping(disclosure_mapping, []) + + vc = { + "@context": credential_context, + "id": credential_id, + "type": credential_type, + "issuer": credential_issuer, + "issuanceDate": issuance_8601, + "validFrom": issuance_8601, + "expirationDate": expiration_8601, + "issued": issuance_8601, + **_credentialSubject, + } + if credential_schema: + vc["credentialSchema"] = credential_schema + if credential_status: + vc["credentialStatus"] = credential_status + if terms_of_use: + vc["termsOfUse"] = terms_of_use + + jwt_credential = create_jwt( + vc=vc, + jti=jti, + sub=sub, + iss=iss, + kid=kid, + key=key, + iat=issuance_epoch, + exp=expiration_epoch, + ) + sd_disclosures = "" + if disclosure_mapping: + sd_disclosures = "~" + "~".join(disclosures) + + return jwt_credential + sd_disclosures + + def decode_disclosure_base64(disclosure_base64: str): # Add padding back to the base64 string if needed while len(disclosure_base64) % 4 != 0: