Skip to content

Commit

Permalink
Fix #16: Support match_credentials to match the credential using inpu…
Browse files Browse the repository at this point in the history
…t_descriptor and extract_disclosure_values using input_descriptor
  • Loading branch information
albinpa authored and georgepadayatti committed Oct 22, 2024
1 parent 95473ef commit 4aa4862
Show file tree
Hide file tree
Showing 2 changed files with 382 additions and 2 deletions.
181 changes: 179 additions & 2 deletions sdjwt/pex.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from jsonschema import exceptions, validate
from jsonschema import exceptions, validate, ValidationError
import json
import base64
from typing import List, Dict, Union, Optional, Tuple
from typing import List, Dict, Union, Optional, Tuple, Any
from pydantic import BaseModel
from sdjwt.didkey import DIDKey
from jwcrypto import jwk, jwt
from jsonpath_ng import jsonpath, parse
from dataclasses import dataclass


class Field(BaseModel):
Expand Down Expand Up @@ -306,3 +308,178 @@ def validate_vp_token_against_presentation_submission_and_presentation_definitio
presentation_submission=presentation_submission
)
verify_vp_token(vp_token=vp_token)


@dataclass
class MatchedPath:
path: str
index: int
value: Any


@dataclass
class MatchedField:
index: int
path: MatchedPath


@dataclass
class MatchedCredential:
index: int
fields: List[MatchedField]


def apply_json_path(input_json_string, path):
try:
# Parse input JSON string
parsed_input = json.loads(input_json_string)
except json.JSONDecodeError as e:
return None, e

try:
# Parse JSON path string
jsonpath_expr = parse(path)
except Exception as e:
return None, e

# Apply JSON path on input and get the matches
matches = [match.value for match in jsonpath_expr.find(parsed_input)]
return matches, None


def validate_json_schema(input_json_string, schema_string):

try:
# Parse schema JSON string
schema = json.loads(schema_string)
except json.JSONDecodeError as e:
return e

try:
# Validate JSON schema against the input JSON
validate(instance=input_json_string, schema=schema)
except ValidationError as e:
return e

return None


def match_credentials(
input_descriptor_json, credentials
) -> Tuple[List[MatchedCredential], Optional[Exception]]:
# Deserialise input descriptor json string
try:
descriptor = json.loads(input_descriptor_json)
except json.JSONDecodeError as e:
return [], e

# To store the matched credentials
matches = []

# Iterate through each credential
for credential_index, credential in enumerate(credentials):

# Assume credential matches until proven otherwise
credential_matched = True
matched_fields = []

# Iterate through fields specified in the constraints
for field_index, field in enumerate(descriptor["constraints"]["fields"]):

# Assume field matches until proven otherwise
field_matched = False

# Iterate through JSON paths for the current field
for path_index, path in enumerate(field["path"]):

# Apply JSON path on the credential
path_matches, err = apply_json_path(credential, path)

if len(path_matches) > 0 and err is None:
if "filter" in field:
try:
filter_bytes = json.dumps(field["filter"])
except (TypeError, ValueError) as e:
# Continue to next path, since filter has failed to serialise
continue

# Validate the matched JSON against the field's filter
if (
validate_json_schema(path_matches[0], filter_bytes)
is not None
):
# Field doesn't match since validation failed
field_matched = False
break

# Add the matched field to the list
field_matched = True
matched_fields.append(
MatchedField(
index=field_index,
path=MatchedPath(
path=path, index=path_index, value=path_matches[0]
),
)
)

if not field_matched:
# If any one field didn't match then move to next credential
credential_matched = False
break

if credential_matched:
# All fields matched, then credential is matched
matches.append(
MatchedCredential(index=credential_index, fields=matched_fields)
)

return matches, None


def decode_base64(encoded_str):
decoded_bytes = base64.urlsafe_b64decode(encoded_str + "==")
return json.loads(decoded_bytes.decode("utf-8"))


def find_all_sd_values(data):
sd_values = []
if isinstance(data, dict):
for key, value in data.items():
if key == "_sd" and isinstance(value, list):
sd_values.extend(value)
else:
sd_values.extend(find_all_sd_values(value))
elif isinstance(data, list):
for item in data:
sd_values.extend(find_all_sd_values(item))
return sd_values


# Function to extract relevant disclosure values
def extract_disclosure_values(input_descriptor, credential, disclosure):
fields = input_descriptor["constraints"]["fields"]
sd_values = find_all_sd_values(credential["credentialSubject"])

matching_disclosures = []
for field in fields:
path = field["path"][0]
key_to_match = path.split(".")[-1]

for sd in sd_values:
if sd in disclosure:
decoded_value = decode_base64(disclosure[sd])
if key_to_match == decoded_value[1]:
matching_disclosures.append(disclosure[sd])
break

return matching_disclosures


def update_disclosures_in_token(token: str, disclosures: list) -> str:
token_with_disclosures = token.split("~")
jwt_token = token_with_disclosures[:1][0]
sd_string = "~" + "~".join(disclosures)

sd_jwt = jwt_token + sd_string
return sd_jwt
203 changes: 203 additions & 0 deletions sdjwt/tests/test_pex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import unittest
import json
from unittest import IsolatedAsyncioTestCase
from sdjwt.pex import (
match_credentials,
MatchedCredential,
MatchedField,
MatchedPath,
extract_disclosure_values,
)


class TestPEX(IsolatedAsyncioTestCase):
async def test_match_credentials(self):
input_descriptor = json.dumps(
{
"id": "ef91319b-81a5-4f71-a602-de3eacccb543",
"constraints": {
"limit_disclosure": "required",
"fields": [
{"path": ["$.credentialSubject.identifier"]},
{"path": ["$.credentialSubject.legalName"]},
],
},
}
)
credentials = [
json.dumps(
{
"@context": ["https://www.w3.org/2018/credentials/v1"],
"credentialSchema": [
{
"id": "https://api-conformance.ebsi.eu/trusted-schemas-registry/v2/schemas/z3MgUFUkb722uq4x3dv5yAJmnNmzDFeK5UC8x83QoeLJM",
"type": "FullJsonSchemaValidator2021",
}
],
"credentialSubject": {
"id": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS",
"identifier": "123400-7899",
"legalName": "Bygg AB",
},
"expirationDate": "2024-06-07T07:07:40Z",
"id": "urn:did:eb2ac148-4f07-492f-aaea-b75a2acc0f98",
"issuanceDate": "2024-06-07T06:07:40Z",
"issued": "2024-06-07T06:07:40Z",
"issuer": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS",
"type": ["VerifiableLegalPersonalIdentificationData"],
"validFrom": "2024-06-07T06:07:40Z",
}
),
json.dumps(
{
"@context": ["https://www.w3.org/2018/credentials/v1"],
"credentialSchema": [
{
"id": "https://api-conformance.ebsi.eu/trusted-schemas-registry/v2/schemas/z3MgUFUkb722uq4x3dv5yAJmnNmzDFeK5UC8x83QoeLJM",
"type": "FullJsonSchemaValidator2021",
}
],
"credentialSubject": {
"id": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS",
"activity": "Construction Industry",
"legalForm": "Aktiebolag",
"legalStatus": "ACTIVE",
"name": "Bygg AB",
"orgNumber": "123400-7899",
"registeredAddress": {
"adminUnitLevel1": "SE",
"fullAddress": "Sveavägen 48, 111 34 Stockholm, Sweden",
"locatorDesignator": "48",
"postCode": "111 34",
"postName": "Stockholm",
"thoroughFare": "Sveavägen",
},
"registrationDate": "2005-10-08",
},
"expirationDate": "2024-06-07T12:52:04Z",
"id": "urn:did:f43432ff-6363-44a6-ba12-82e6c5b41c8a",
"issuanceDate": "2024-06-07T11:52:04Z",
"issued": "2024-06-07T11:52:04Z",
"issuer": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS",
"type": ["VerifiableCertificateOfRegistration"],
"validFrom": "2024-06-07T11:52:04Z",
}
),
]

expected_matched_credentials = (
[
MatchedCredential(
index=0,
fields=[
MatchedField(
index=0,
path=MatchedPath(
path="$.credentialSubject.identifier",
index=0,
value="123400-7899",
),
),
MatchedField(
index=1,
path=MatchedPath(
path="$.credentialSubject.legalName",
index=0,
value="Bygg AB",
),
),
],
)
],
None,
)

matched_credentials = match_credentials(
input_descriptor_json=input_descriptor, credentials=credentials
)

condition_1 = matched_credentials == expected_matched_credentials
self.assert_(
condition_1,
"Expected matched credential doesn't match with result",
)

async def test_extract_disclosure_values(self):
credential = {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"credentialSchema": [
{
"id": "https://api-conformance.ebsi.eu/trusted-schemas-registry/v2/schemas/z3MgUFUkb722uq4x3dv5yAJmnNmzDFeK5UC8x83QoeLJM",
"type": "FullJsonSchemaValidator2021",
}
],
"credentialSubject": {
"_sd": ["7sCYwjBINYYha3SbjxvLpdt8q-uUjcxA0HC5z2N15Vs"],
"activity": "test",
"id": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS",
"legalForm": "tt",
"name": "test",
"orgNumber": "1111",
"registeredAddress": {
"_sd": [
"WgasKnzLW0ZxJ4tUg_INr0Qs51DLqda_A_JXadqM1Iw",
"UkbKFenSPl93IJ5H4QcreNahGQG_KNu0_OjbmUcyTu4",
"Fk1TPdjypTc-vwLSet2FrjTNevtZFqJIea_-RDyE1D4",
],
"fullAddress": "wee6",
"postCode": "jko",
"postName": "dfgg",
},
"registrationDate": "14-06-2024",
},
"expirationDate": "2024-06-14T10:58:04Z",
"id": "urn:did:9b0757ae-450b-4775-b680-15ab0c4a83a0",
"issuanceDate": "2024-06-14T09:58:04Z",
"issued": "2024-06-14T09:58:04Z",
"issuer": "did:key:z2dmzD81cgPx8Vki7JbuuMmFYrWPgYoytykUZ3eyqht1j9KbnhVe3bugMJPbfkv9GcRq5ogEKVHwnudNt4jMaTMzAgQgM5Pd61nbJ3vGzjn5dqo4C9X1FTZ6RY1rv3dPg6ux6auTrWtBWtUby2KJ3BKfJmKtwAynWLNr8EqRnuz2nFPZbS",
"type": ["VerifiableCertificateOfRegistration"],
"validFrom": "2024-06-14T09:58:04Z",
}


disclosure = {
"7sCYwjBINYYha3SbjxvLpdt8q-uUjcxA0HC5z2N15Vs": "WyI1YjU2ZGU2ZjUzZWFmYjQ2NzAzYmRjMTU3NmE2MzE2ZDZlZmI5Mjk0ZDgyNzUyODI4NTI5ZDIzNTJhY2ZkNWZlIiwibGVnYWxTdGF0dXMiLCJ0ZXN0Il0",
"WgasKnzLW0ZxJ4tUg_INr0Qs51DLqda_A_JXadqM1Iw": "WyI1NDNkMWNiMmE5MGNkM2ExMmVjNmFlZDRmZTI1YjljY2RiNmU3MmI5N2I1YTZkNTI4OWYzM2ZlYTZiMzA2YWU0IiwiYWRtaW5Vbml0TGV2ZWwxIiwidHkiXQ",
"UkbKFenSPl93IJ5H4QcreNahGQG_KNu0_OjbmUcyTu4": "WyIwMThlN2VkYmRlNzkyMzhiNTAzNDBiMzNhNDM3ODRmYWFhYWViNTdlNWM4YzZlZjY5MTlmODM2MGZkMzdjMDRhIiwibG9jYXRvckRlc2lnbmF0b3IiLCJ0dTkiXQ",
"Fk1TPdjypTc-vwLSet2FrjTNevtZFqJIea_-RDyE1D4": "WyIyNTBjY2U3YzAyZDI5YzRhNGZmOTkzZmU1MDFiY2IzYWMxMDAyNmU0MjhhNGMyOTdjMDcwOTg0OGY4NGRjMTQ2IiwidGhvcm91Z2hGYXJlIiwidmJibiJd",
}

input_descriptor = {
"id": "ef91319b-81a5-4f71-a602-de3eacccb543",
"constraints": {
"limit_disclosure": "required",
"fields": [
{"path": ["$.credentialSubject.registeredAddress.adminUnitLevel1"]},
{"path": ["$.credentialSubject.registeredAddress.locatorDesignator"]},
{"path": ["$.credentialSubject.legalStatus"]},
{
"path": ["$.credentialSubject.legalForm"],
"filter": {"type": "string", "const": "EKM"},
},
],
},
}
expected_disclosures = ['WyI1NDNkMWNiMmE5MGNkM2ExMmVjNmFlZDRmZTI1YjljY2RiNmU3MmI5N2I1YTZkNTI4OWYzM2ZlYTZiMzA2YWU0IiwiYWRtaW5Vbml0TGV2ZWwxIiwidHkiXQ', 'WyIwMThlN2VkYmRlNzkyMzhiNTAzNDBiMzNhNDM3ODRmYWFhYWViNTdlNWM4YzZlZjY5MTlmODM2MGZkMzdjMDRhIiwibG9jYXRvckRlc2lnbmF0b3IiLCJ0dTkiXQ', 'WyI1YjU2ZGU2ZjUzZWFmYjQ2NzAzYmRjMTU3NmE2MzE2ZDZlZmI5Mjk0ZDgyNzUyODI4NTI5ZDIzNTJhY2ZkNWZlIiwibGVnYWxTdGF0dXMiLCJ0ZXN0Il0']

disclosures = extract_disclosure_values(input_descriptor=input_descriptor,credential=credential,disclosure=disclosure)

condition_1 = disclosures == expected_disclosures
self.assert_(
condition_1,
"Expected disclosures doesn't match with result",
)

condition_2 = disclosures[0] == expected_disclosures[0]
self.assert_(
condition_1,
"Expected disclosure doesn't match with result",
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 4aa4862

Please sign in to comment.