diff --git a/sdjwt/pex.py b/sdjwt/pex.py index d1d67bd..7cb8dcc 100644 --- a/sdjwt/pex.py +++ b/sdjwt/pex.py @@ -1,10 +1,12 @@ -from jsonschema import exceptions, validate +from jsonschema import exceptions, validate, ValidationError import json import base64 -from typing import List, Dict, Union, Optional, Tuple +from typing import List, Dict, Union, Optional, Tuple, Any from pydantic import BaseModel from sdjwt.didkey import DIDKey from jwcrypto import jwk, jwt +from jsonpath_ng import jsonpath, parse +from dataclasses import dataclass class Field(BaseModel): @@ -306,3 +308,178 @@ def validate_vp_token_against_presentation_submission_and_presentation_definitio presentation_submission=presentation_submission ) verify_vp_token(vp_token=vp_token) + + +@dataclass +class MatchedPath: + path: str + index: int + value: Any + + +@dataclass +class MatchedField: + index: int + path: MatchedPath + + +@dataclass +class MatchedCredential: + index: int + fields: List[MatchedField] + + +def apply_json_path(input_json_string, path): + try: + # Parse input JSON string + parsed_input = json.loads(input_json_string) + except json.JSONDecodeError as e: + return None, e + + try: + # Parse JSON path string + jsonpath_expr = parse(path) + except Exception as e: + return None, e + + # Apply JSON path on input and get the matches + matches = [match.value for match in jsonpath_expr.find(parsed_input)] + return matches, None + + +def validate_json_schema(input_json_string, schema_string): + + try: + # Parse schema JSON string + schema = json.loads(schema_string) + except json.JSONDecodeError as e: + return e + + try: + # Validate JSON schema against the input JSON + validate(instance=input_json_string, schema=schema) + except ValidationError as e: + return e + + return None + + +def match_credentials( + input_descriptor_json, credentials +) -> Tuple[List[MatchedCredential], Optional[Exception]]: + # Deserialise input descriptor json string + try: + descriptor = json.loads(input_descriptor_json) + except json.JSONDecodeError as e: + return [], e + + # To store the matched credentials + matches = [] + + # Iterate through each credential + for credential_index, credential in enumerate(credentials): + + # Assume credential matches until proven otherwise + credential_matched = True + matched_fields = [] + + # Iterate through fields specified in the constraints + for field_index, field in enumerate(descriptor["constraints"]["fields"]): + + # Assume field matches until proven otherwise + field_matched = False + + # Iterate through JSON paths for the current field + for path_index, path in enumerate(field["path"]): + + # Apply JSON path on the credential + path_matches, err = apply_json_path(credential, path) + + if len(path_matches) > 0 and err is None: + if "filter" in field: + try: + filter_bytes = json.dumps(field["filter"]) + except (TypeError, ValueError) as e: + # Continue to next path, since filter has failed to serialise + continue + + # Validate the matched JSON against the field's filter + if ( + validate_json_schema(path_matches[0], filter_bytes) + is not None + ): + # Field doesn't match since validation failed + field_matched = False + break + + # Add the matched field to the list + field_matched = True + matched_fields.append( + MatchedField( + index=field_index, + path=MatchedPath( + path=path, index=path_index, value=path_matches[0] + ), + ) + ) + + if not field_matched: + # If any one field didn't match then move to next credential + credential_matched = False + break + + if credential_matched: + # All fields matched, then credential is matched + matches.append( + MatchedCredential(index=credential_index, fields=matched_fields) + ) + + return matches, None + + +def decode_base64(encoded_str): + decoded_bytes = base64.urlsafe_b64decode(encoded_str + "==") + return json.loads(decoded_bytes.decode("utf-8")) + + +def find_all_sd_values(data): + sd_values = [] + if isinstance(data, dict): + for key, value in data.items(): + if key == "_sd" and isinstance(value, list): + sd_values.extend(value) + else: + sd_values.extend(find_all_sd_values(value)) + elif isinstance(data, list): + for item in data: + sd_values.extend(find_all_sd_values(item)) + return sd_values + + +# Function to extract relevant disclosure values +def extract_disclosure_values(input_descriptor, credential, disclosure): + fields = input_descriptor["constraints"]["fields"] + sd_values = find_all_sd_values(credential["credentialSubject"]) + + matching_disclosures = [] + for field in fields: + path = field["path"][0] + key_to_match = path.split(".")[-1] + + for sd in sd_values: + if sd in disclosure: + decoded_value = decode_base64(disclosure[sd]) + if key_to_match == decoded_value[1]: + matching_disclosures.append(disclosure[sd]) + break + + return matching_disclosures + + +def update_disclosures_in_token(token: str, disclosures: list) -> str: + token_with_disclosures = token.split("~") + jwt_token = token_with_disclosures[:1][0] + sd_string = "~" + "~".join(disclosures) + + sd_jwt = jwt_token + sd_string + return sd_jwt diff --git a/sdjwt/tests/test_pex.py b/sdjwt/tests/test_pex.py new file mode 100644 index 0000000..3d91228 --- /dev/null +++ b/sdjwt/tests/test_pex.py @@ -0,0 +1,203 @@ +import unittest +import json +from unittest import IsolatedAsyncioTestCase +from sdjwt.pex import ( + match_credentials, + MatchedCredential, + MatchedField, + MatchedPath, + extract_disclosure_values, +) + + +class TestPEX(IsolatedAsyncioTestCase): + async def test_match_credentials(self): + input_descriptor = json.dumps( + { + "id": "ef91319b-81a5-4f71-a602-de3eacccb543", + "constraints": { + "limit_disclosure": "required", + "fields": [ + {"path": ["$.credentialSubject.identifier"]}, + {"path": ["$.credentialSubject.legalName"]}, + ], + }, + } + ) + credentials = [ + json.dumps( + { + "@context": ["https://www.w3.org/2018/credentials/v1"], + "credentialSchema": [ + { + "id": "https://api-conformance.ebsi.eu/trusted-schemas-registry/v2/schemas/z3MgUFUkb722uq4x3dv5yAJmnNmzDFeK5UC8x83QoeLJM", + "type": "FullJsonSchemaValidator2021", + } + ], + "credentialSubject": { + "id": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS", + "identifier": "123400-7899", + "legalName": "Bygg AB", + }, + "expirationDate": "2024-06-07T07:07:40Z", + "id": "urn:did:eb2ac148-4f07-492f-aaea-b75a2acc0f98", + "issuanceDate": "2024-06-07T06:07:40Z", + "issued": "2024-06-07T06:07:40Z", + "issuer": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS", + "type": ["VerifiableLegalPersonalIdentificationData"], + "validFrom": "2024-06-07T06:07:40Z", + } + ), + json.dumps( + { + "@context": ["https://www.w3.org/2018/credentials/v1"], + "credentialSchema": [ + { + "id": "https://api-conformance.ebsi.eu/trusted-schemas-registry/v2/schemas/z3MgUFUkb722uq4x3dv5yAJmnNmzDFeK5UC8x83QoeLJM", + "type": "FullJsonSchemaValidator2021", + } + ], + "credentialSubject": { + "id": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS", + "activity": "Construction Industry", + "legalForm": "Aktiebolag", + "legalStatus": "ACTIVE", + "name": "Bygg AB", + "orgNumber": "123400-7899", + "registeredAddress": { + "adminUnitLevel1": "SE", + "fullAddress": "Sveavägen 48, 111 34 Stockholm, Sweden", + "locatorDesignator": "48", + "postCode": "111 34", + "postName": "Stockholm", + "thoroughFare": "Sveavägen", + }, + "registrationDate": "2005-10-08", + }, + "expirationDate": "2024-06-07T12:52:04Z", + "id": "urn:did:f43432ff-6363-44a6-ba12-82e6c5b41c8a", + "issuanceDate": "2024-06-07T11:52:04Z", + "issued": "2024-06-07T11:52:04Z", + "issuer": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS", + "type": ["VerifiableCertificateOfRegistration"], + "validFrom": "2024-06-07T11:52:04Z", + } + ), + ] + + expected_matched_credentials = ( + [ + MatchedCredential( + index=0, + fields=[ + MatchedField( + index=0, + path=MatchedPath( + path="$.credentialSubject.identifier", + index=0, + value="123400-7899", + ), + ), + MatchedField( + index=1, + path=MatchedPath( + path="$.credentialSubject.legalName", + index=0, + value="Bygg AB", + ), + ), + ], + ) + ], + None, + ) + + matched_credentials = match_credentials( + input_descriptor_json=input_descriptor, credentials=credentials + ) + + condition_1 = matched_credentials == expected_matched_credentials + self.assert_( + condition_1, + "Expected matched credential doesn't match with result", + ) + + async def test_extract_disclosure_values(self): + credential = { + "@context": ["https://www.w3.org/2018/credentials/v1"], + "credentialSchema": [ + { + "id": "https://api-conformance.ebsi.eu/trusted-schemas-registry/v2/schemas/z3MgUFUkb722uq4x3dv5yAJmnNmzDFeK5UC8x83QoeLJM", + "type": "FullJsonSchemaValidator2021", + } + ], + "credentialSubject": { + "_sd": ["7sCYwjBINYYha3SbjxvLpdt8q-uUjcxA0HC5z2N15Vs"], + "activity": "test", + "id": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS", + "legalForm": "tt", + "name": "test", + "orgNumber": "1111", + "registeredAddress": { + "_sd": [ + "WgasKnzLW0ZxJ4tUg_INr0Qs51DLqda_A_JXadqM1Iw", + "UkbKFenSPl93IJ5H4QcreNahGQG_KNu0_OjbmUcyTu4", + "Fk1TPdjypTc-vwLSet2FrjTNevtZFqJIea_-RDyE1D4", + ], + "fullAddress": "wee6", + "postCode": "jko", + "postName": "dfgg", + }, + "registrationDate": "14-06-2024", + }, + "expirationDate": "2024-06-14T10:58:04Z", + "id": "urn:did:9b0757ae-450b-4775-b680-15ab0c4a83a0", + "issuanceDate": "2024-06-14T09:58:04Z", + "issued": "2024-06-14T09:58:04Z", + "issuer": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS", + "type": ["VerifiableCertificateOfRegistration"], + "validFrom": "2024-06-14T09:58:04Z", + } + + + disclosure = { + "7sCYwjBINYYha3SbjxvLpdt8q-uUjcxA0HC5z2N15Vs": "WyI1YjU2ZGU2ZjUzZWFmYjQ2NzAzYmRjMTU3NmE2MzE2ZDZlZmI5Mjk0ZDgyNzUyODI4NTI5ZDIzNTJhY2ZkNWZlIiwibGVnYWxTdGF0dXMiLCJ0ZXN0Il0", + "WgasKnzLW0ZxJ4tUg_INr0Qs51DLqda_A_JXadqM1Iw": "WyI1NDNkMWNiMmE5MGNkM2ExMmVjNmFlZDRmZTI1YjljY2RiNmU3MmI5N2I1YTZkNTI4OWYzM2ZlYTZiMzA2YWU0IiwiYWRtaW5Vbml0TGV2ZWwxIiwidHkiXQ", + "UkbKFenSPl93IJ5H4QcreNahGQG_KNu0_OjbmUcyTu4": "WyIwMThlN2VkYmRlNzkyMzhiNTAzNDBiMzNhNDM3ODRmYWFhYWViNTdlNWM4YzZlZjY5MTlmODM2MGZkMzdjMDRhIiwibG9jYXRvckRlc2lnbmF0b3IiLCJ0dTkiXQ", + "Fk1TPdjypTc-vwLSet2FrjTNevtZFqJIea_-RDyE1D4": "WyIyNTBjY2U3YzAyZDI5YzRhNGZmOTkzZmU1MDFiY2IzYWMxMDAyNmU0MjhhNGMyOTdjMDcwOTg0OGY4NGRjMTQ2IiwidGhvcm91Z2hGYXJlIiwidmJibiJd", + } + + input_descriptor = { + "id": "ef91319b-81a5-4f71-a602-de3eacccb543", + "constraints": { + "limit_disclosure": "required", + "fields": [ + {"path": ["$.credentialSubject.registeredAddress.adminUnitLevel1"]}, + {"path": ["$.credentialSubject.registeredAddress.locatorDesignator"]}, + {"path": ["$.credentialSubject.legalStatus"]}, + { + "path": ["$.credentialSubject.legalForm"], + "filter": {"type": "string", "const": "EKM"}, + }, + ], + }, + } + expected_disclosures = ['WyI1NDNkMWNiMmE5MGNkM2ExMmVjNmFlZDRmZTI1YjljY2RiNmU3MmI5N2I1YTZkNTI4OWYzM2ZlYTZiMzA2YWU0IiwiYWRtaW5Vbml0TGV2ZWwxIiwidHkiXQ', 'WyIwMThlN2VkYmRlNzkyMzhiNTAzNDBiMzNhNDM3ODRmYWFhYWViNTdlNWM4YzZlZjY5MTlmODM2MGZkMzdjMDRhIiwibG9jYXRvckRlc2lnbmF0b3IiLCJ0dTkiXQ', 'WyI1YjU2ZGU2ZjUzZWFmYjQ2NzAzYmRjMTU3NmE2MzE2ZDZlZmI5Mjk0ZDgyNzUyODI4NTI5ZDIzNTJhY2ZkNWZlIiwibGVnYWxTdGF0dXMiLCJ0ZXN0Il0'] + + disclosures = extract_disclosure_values(input_descriptor=input_descriptor,credential=credential,disclosure=disclosure) + + condition_1 = disclosures == expected_disclosures + self.assert_( + condition_1, + "Expected disclosures doesn't match with result", + ) + + condition_2 = disclosures[0] == expected_disclosures[0] + self.assert_( + condition_1, + "Expected disclosure doesn't match with result", + ) + + +if __name__ == "__main__": + unittest.main()