Skip to content

Commit

Permalink
Fix #17: Credential supported is returning incorrect values in well k…
Browse files Browse the repository at this point in the history
…nown endpoint
  • Loading branch information
albinpa authored and georgepadayatti committed May 9, 2024
1 parent 449acff commit 099fe05
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 40 deletions.
5 changes: 5 additions & 0 deletions eudi_wallet/ebsi/entry_points/server/routes/v2/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@
from eudi_wallet.ebsi.usecases.v2.organisation.list_verification_request_usecase import (
ListVerificationRequestUsecase,
)
from eudi_wallet.ebsi.entry_points.server.v2_well_known import (
validate_credential_type_based_on_disclosure_mapping
)

config_routes = web.RouteTableDef()

Expand Down Expand Up @@ -576,6 +579,8 @@ async def handle_post_issue_credential(request: Request, context: V2RequestConte
)

else:
assert credential.get("type", []) is not None
credential = validate_credential_type_based_on_disclosure_mapping(credential=credential,disclosure_mapping=disclosure_mapping)
credential_offer = await context.legal_entity_service.issue_credential_with_disclosure_mapping(
issuance_mode=issue_credential_req.issuanceMode,
is_pre_authorised=issue_credential_req.isPreAuthorised,
Expand Down
10 changes: 6 additions & 4 deletions eudi_wallet/ebsi/entry_points/server/routes/v2/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ async def handle_service_get_well_known_openid_credential_issuer_configuration(
request: Request,
context: V2RequestContext,
):
assert context.app_context.db_session is not None
organisation_id = request.match_info.get("organisationId")
if organisation_id is None:
raise web.HTTPBadRequest(reason="Invalid organisation id")

res = service_get_well_known_openid_credential_issuer_config(
context.app_context.domain,
organisation_id,
context.app_context.logger,
wallet_domain=context.app_context.domain,
organisation_id=organisation_id,
logger=context.app_context.logger,
session=context.app_context.db_session,
legal_entity_repository=context.organisation_repository,
data_agreement_repository=context.data_agreement_repository,
)
return web.json_response(res)

Expand Down
119 changes: 83 additions & 36 deletions eudi_wallet/ebsi/entry_points/server/v2_well_known.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,45 @@
from eudi_wallet.ebsi.exceptions.application.organisation import (
LegalEntityNotFoundError,
)
from eudi_wallet.ebsi.models.v2.data_agreement import V2DataAgreementModel
from eudi_wallet.ebsi.usecases.v2.organisation.list_all_data_agreements_usecase import (
V2ListAllDataAgreementsUsecase,
)
from eudi_wallet.ebsi.repositories.v2.data_agreement import (
SqlAlchemyV2DataAgreementRepository,
from eudi_wallet.ebsi.repositories.v2.issue_credential_record import (
SqlAlchemyIssueCredentialRecordRepository,
)


def service_get_well_known_openid_credential_issuer_config(
wallet_domain: str,
organisation_id: str,
logger: Logger,
session: Session,
legal_entity_repository: SqlAlchemyOrganisationRepository,
data_agreement_repository: SqlAlchemyV2DataAgreementRepository,
):
# For additional fields like logo, background color, text color e.t.c
# Check https://openid.net/specs/openid-4-verifiable-credential-issuance-1_0.html#name-credential-issuer-metadata

issue_credential_repository = SqlAlchemyIssueCredentialRecordRepository(
session=session, logger=logger
)

assert issue_credential_repository is not None

with issue_credential_repository as credential_repo:

credential_offers = (
credential_repo.get_all_by_organisation_id_and_with_credential(
organisation_id=organisation_id
)
)

with legal_entity_repository as repo:
legal_entity = repo.get_by_id(id=organisation_id)

if legal_entity is None:
raise LegalEntityNotFoundError(
f"Legal entity with id {organisation_id} not found"
)

usecase = V2ListAllDataAgreementsUsecase(
dataagreement_repository=data_agreement_repository,
logger=logger,
credentials_supported = create_credential_supported_from_credential_offers(
credential_offers=credential_offers
)
data_agreements = usecase.execute(organisation_id=organisation_id)

credentials_supported = {}
for data_agreement in data_agreements:
if data_agreement.limitedDisclosure:
format = "vc+sd-jwt"
else:
format = "jwt_vc"
credential = {
"format": format,
"scope": data_agreement.purpose,
"cryptographic_binding_methods_supported": ["jwk"],
"cryptographic_suites_supported": ["ES256"],
"display": [
{
"name": data_agreement.purpose,
"locale": "en-GB",
"background_color": "#12107c",
"text_color": "#FFFFFF",
}
],
}

credentials_supported[data_agreement.purpose] = credential
# credentials_supported.append(credential)

openid_credential_issuer_config = {
"credential_issuer": f"{wallet_domain}/organisation/{organisation_id}/service",
Expand Down Expand Up @@ -126,3 +109,67 @@ def service_get_well_known_authn_openid_config(
"attester_signed_id_token",
],
}


def add_or_replace_suffix(credential_type, current_suffix, new_suffix):
# Check if the credential type ends with the current suffix
if credential_type.endswith(new_suffix):
if new_suffix == "SdJwt":
return credential_type
if credential_type.endswith(current_suffix):
# Remove the current suffix and add new suffix
credential_type = credential_type[: -len(current_suffix)] + new_suffix
elif not credential_type.endswith(new_suffix):
# Add the new suffix if it's not already present
credential_type += new_suffix
return credential_type


def validate_credential_type_based_on_disclosure_mapping(
credential: dict, disclosure_mapping: dict
) -> dict:
credential_types = credential.get("type", [])
credential_type = credential_types[-1]
if disclosure_mapping:
credential_type = add_or_replace_suffix(credential_type, "Jwt", "SdJwt")
credential_types[-1] = credential_type
else:
credential_type = add_or_replace_suffix(credential_type, "SdJwt", "Jwt")
credential_types[-1] = credential_type
credential["type"] = credential_types
return credential


def create_credential_supported_from_credential_offers(credential_offers: list) -> dict:
credentials_supported = {}
for credential_offer in credential_offers:
credential_types = credential_offer.credential.get("type", [])
if credential_types:
credential = validate_credential_type_based_on_disclosure_mapping(
credential=credential_offer.credential,
disclosure_mapping=credential_offer.disclosureMapping,
)
credential_types = credential.get("type", [])

if credential_offer.disclosureMapping:
format = "vc+sd-jwt"
else:
format = "jwt_vc"
credential_supported = {
"format": format,
"scope": credential_types[-1],
"cryptographic_binding_methods_supported": ["jwk"],
"cryptographic_suites_supported": ["ES256"],
"display": [
{
"name": credential_types[-1],
"locale": "en-GB",
"background_color": "#12107c",
"text_color": "#FFFFFF",
}
],
}

credentials_supported[credential_types[-1]] = credential_supported

return credentials_supported
13 changes: 13 additions & 0 deletions eudi_wallet/ebsi/repositories/v2/issue_credential_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ def get_all_by_organisation_id(
)
.all()
)

def get_all_by_organisation_id_and_with_credential(
self, organisation_id: str
) -> List[IssueCredentialRecordModel]:
assert self.session is not None
return (
self.session.query(IssueCredentialRecordModel)
.filter(
IssueCredentialRecordModel.organisationId == organisation_id,
IssueCredentialRecordModel.dataAgreementId == None
)
.all()
)

def get_all_by_data_agreement_id(
self, data_agreement_id: str
Expand Down

0 comments on commit 099fe05

Please sign in to comment.