diff --git a/dsp_permissions_scripts/template.py b/dsp_permissions_scripts/template.py index e0907a91..c0e9ee73 100644 --- a/dsp_permissions_scripts/template.py +++ b/dsp_permissions_scripts/template.py @@ -15,27 +15,45 @@ def modify_oaps(oaps: list[Oap]) -> list[Oap]: + """Adapt this sample to your needs.""" for oap in oaps: oap.scope.CR.append(BuiltinGroup.SYSTEM_ADMIN) return oaps def modify_doaps(doaps: list[Doap]) -> list[Doap]: + """Adapt this sample to your needs.""" for doap in doaps: if doap.target.group in [BuiltinGroup.PROJECT_MEMBER.value, BuiltinGroup.PROJECT_ADMIN.value]: doap.scope = PUBLIC return doaps -def main() -> None: - """ - The main method assembles a sample call of all available high-level functions. - """ - load_dotenv() # set login credentials from .env file as environment variables - host = Hosts.get_host("test") - shortcode = "F18E" - token = login(host) +def update_oaps( + host: str, + shortcode: str, + token: str, +) -> None: + """Sample function to modify the Object Access Permissions of a project.""" + resource_oaps = get_all_resource_oaps_of_project( + shortcode=shortcode, + host=host, + token=token, + ) + resource_oaps_updated = modify_oaps(oaps=resource_oaps) + apply_updated_oaps_on_server( + resource_oaps=resource_oaps_updated, + host=host, + token=token, + ) + +def update_doaps( + host: str, + shortcode: str, + token: str, +) -> None: + """Sample function to modify the Default Object Access Permissions of a project.""" project_doaps = get_doaps_of_project( host=host, shortcode=shortcode, @@ -52,15 +70,28 @@ def main() -> None: host=host, token=token, ) - resource_oaps = get_all_resource_oaps_of_project( - shortcode=shortcode, + + +def main() -> None: + """ + The main function provides you with 2 sample functions: + one to update the Object Access Permissions of a project, + and one to update the Default Object Access Permissions of a project. + Both must first be adapted to your needs. + """ + load_dotenv() # set login credentials from .env file as environment variables + host = Hosts.get_host("test") + shortcode = "F18E" + token = login(host) + + update_oaps( host=host, + shortcode=shortcode, token=token, ) - resource_oaps_updated = modify_oaps(oaps=resource_oaps) - apply_updated_oaps_on_server( - resource_oaps=resource_oaps_updated, + update_doaps( host=host, + shortcode=shortcode, token=token, ) diff --git a/dsp_permissions_scripts/utils/authentication.py b/dsp_permissions_scripts/utils/authentication.py index 516b0e77..46414f11 100644 --- a/dsp_permissions_scripts/utils/authentication.py +++ b/dsp_permissions_scripts/utils/authentication.py @@ -3,7 +3,7 @@ import requests -def __get_token(host: str, email: str, pw: str) -> str: +def _get_token(host: str, email: str, pw: str) -> str: """ requests an access token from the API, provided host, email and password. """ @@ -15,7 +15,7 @@ def __get_token(host: str, email: str, pw: str) -> str: return token -def __get_login_credentials(host: str) -> tuple[str, str]: +def _get_login_credentials(host: str) -> tuple[str, str]: """ Retrieve user email and password from the environment variables. In case of localhost, return the default email/password for localhost. @@ -41,8 +41,8 @@ def login(host: str) -> str: Returns: token: access token """ - user, pw = __get_login_credentials(host) - token = __get_token(host, user, pw) + user, pw = _get_login_credentials(host) + token = _get_token(host, user, pw) return token diff --git a/dsp_permissions_scripts/utils/doap_get.py b/dsp_permissions_scripts/utils/doap_get.py index 577636e3..8863c3fb 100644 --- a/dsp_permissions_scripts/utils/doap_get.py +++ b/dsp_permissions_scripts/utils/doap_get.py @@ -14,7 +14,7 @@ logger = get_logger(__name__) -def __filter_doaps_by_target( +def _filter_doaps_by_target( doaps: list[Doap], target: DoapTargetType, ) -> list[Doap]: @@ -34,7 +34,7 @@ def __filter_doaps_by_target( return filtered_doaps -def __get_all_doaps_of_project( +def _get_all_doaps_of_project( project_iri: str, host: str, token: str, @@ -87,12 +87,12 @@ def get_doaps_of_project( shortcode=shortcode, host=host, ) - doaps = __get_all_doaps_of_project( + doaps = _get_all_doaps_of_project( project_iri=project_iri, host=host, token=token, ) - filtered_doaps = __filter_doaps_by_target( + filtered_doaps = _filter_doaps_by_target( doaps=doaps, target=target, ) diff --git a/dsp_permissions_scripts/utils/doap_set.py b/dsp_permissions_scripts/utils/doap_set.py index a1794e0e..fe298d0d 100644 --- a/dsp_permissions_scripts/utils/doap_set.py +++ b/dsp_permissions_scripts/utils/doap_set.py @@ -14,7 +14,7 @@ logger = get_logger(__name__) -def __update_doap_scope( +def _update_doap_scope( doap_iri: str, scope: PermissionScope, host: str, @@ -34,7 +34,7 @@ def __update_doap_scope( return new_doap -def __log_and_print_doap_update( +def _log_and_print_doap_update( doap: Doap, state: Literal["before", "after"], ) -> None: @@ -64,12 +64,12 @@ def apply_updated_doaps_on_server( heading = f"{get_timestamp()}: Updating {len(doaps)} DOAPs on {host}..." print(f"\n{heading}\n{'=' * len(heading)}\n") for d in doaps: - __log_and_print_doap_update(doap=d, state="before") - new_doap = __update_doap_scope( + _log_and_print_doap_update(doap=d, state="before") + new_doap = _update_doap_scope( doap_iri=d.doap_iri, scope=d.scope, host=host, token=token, ) - __log_and_print_doap_update(doap=new_doap, state="after") + _log_and_print_doap_update(doap=new_doap, state="after") print(f"{get_timestamp()}: All DOAPs have been updated.") diff --git a/dsp_permissions_scripts/utils/oap.py b/dsp_permissions_scripts/utils/oap.py index 8c982325..a0df5673 100644 --- a/dsp_permissions_scripts/utils/oap.py +++ b/dsp_permissions_scripts/utils/oap.py @@ -13,76 +13,68 @@ logger = get_logger(__name__) -def apply_updated_oaps_on_server( - resource_oaps: list[Oap], - host: str, - token: str, -) -> None: - """Applies object access permissions on a DSP server.""" - logger.info("******* Applying updated object access permissions on server *******") - print(f"{get_timestamp()}: ******* Applying updated object access permissions on server *******") - for index, resource_oap in enumerate(resource_oaps): - msg = f"Updating permissions of resource {index + 1}/{len(resource_oaps)}: {resource_oap.object_iri}..." - logger.info("=====") - logger.info(msg) - print(f"{get_timestamp()}: {msg}") - __update_permissions_for_resource_and_values( - resource_iri=resource_oap.object_iri, - scope=resource_oap.scope, - host=host, - token=token, - ) - logger.info(f"Updated permissions of resource {resource_oap.object_iri} and its values.") - -def __update_permissions_for_resource_and_values( - resource_iri: str, - scope: PermissionScope, - host: str, - token: str, -) -> None: +def _get_value_iris(resource: dict[str, Any]) -> list[ValueUpdate]: """ - Updates the permissions for the given resource and its values. + Returns a list of values that have permissions and hence should be updated. """ - resource = __get_resource(resource_iri, host, token) - lmd = __get_lmd(resource) - type_ = __get_type(resource) - context = __get_context(resource) - values = __get_value_iris(resource) - update_permissions_for_resource(resource_iri, lmd, type_, context, scope, host, token) - for v in values: - __update_permissions_for_value(resource_iri, v, type_, context, scope, host, token) + res: list[ValueUpdate] = [] + for k, v in resource.items(): + if k in {"@id", "@type", "@context", "rdfs:label"}: + continue + match v: + case { + "@id": id_, + "@type": type_, + **properties, + } if "/values/" in id_ and "knora-api:hasPermissions" in properties: + res.append(ValueUpdate(k, id_, type_)) + case _: + continue + return res -def update_permissions_for_resource( +def _get_resource( resource_iri: str, - lmd: str | None, - type_: str, - context: dict[str, str], - scope: PermissionScope, host: str, token: str, -) -> None: +) -> dict[str, Any]: """ - Updates the permissions for the given resource. + Requests the resource with the given IRI from the API. """ - payload = { - "@id": resource_iri, - "@type": type_, - "knora-api:hasPermissions": create_string_from_scope(scope), - "@context": context, - } - if lmd: - payload["knora-api:lastModificationDate"] = lmd + iri = quote_plus(resource_iri, safe="") protocol = get_protocol(host) - url = f"{protocol}://{host}/v2/resources" + url = f"{protocol}://{host}/v2/resources/{iri}" headers = {"Authorization": f"Bearer {token}"} - response = requests.put(url, headers=headers, json=payload, timeout=5) + response = requests.get(url, headers=headers, timeout=5) assert response.status_code == 200 - logger.info(f"Updated permissions of resource {resource_iri}") + data: dict[str, Any] = response.json() + return data + +def _get_lmd(resource: dict[str, Any]) -> str | None: + """ + Gets last modification date from a resource JSON-LD dict. + """ + return resource.get("knora-api:lastModificationDate") -def __update_permissions_for_value( + +def _get_type(resource: dict[str, Any]) -> str: + """ + Gets the type from a resource JSON-LD dict.""" + t: str = resource["@type"] + return t + + +def _get_context(resource: dict[str, Any]) -> dict[str, str]: + """ + Gets the context object from a resource JSON-LD dict. + """ + c: dict[str, str] = resource["@context"] + return c + + +def _update_permissions_for_value( resource_iri: str, value: ValueUpdate, resource_type: str, @@ -124,61 +116,86 @@ def __update_permissions_for_value( logger.info(f"Updated permissions of resource {resource_iri}, value {value.value_iri}") -def __get_value_iris(resource: dict[str, Any]) -> list[ValueUpdate]: - """ - Returns a list of values that have permissions and hence should be updated. - """ - res: list[ValueUpdate] = [] - for k, v in resource.items(): - if k in {"@id", "@type", "@context", "rdfs:label"}: - continue - match v: - case { - "@id": id_, - "@type": type_, - **properties, - } if "/values/" in id_ and "knora-api:hasPermissions" in properties: - res.append(ValueUpdate(k, id_, type_)) - case _: - continue - return res - - -def __get_resource( +def _update_permissions_for_resource( resource_iri: str, + lmd: str | None, + resource_type: str, + context: dict[str, str], + scope: PermissionScope, host: str, token: str, -) -> dict[str, Any]: +) -> None: """ - Requests the resource with the given IRI from the API. + Updates the permissions for the given resource. """ - iri = quote_plus(resource_iri, safe="") + payload = { + "@id": resource_iri, + "@type": resource_type, + "knora-api:hasPermissions": create_string_from_scope(scope), + "@context": context, + } + if lmd: + payload["knora-api:lastModificationDate"] = lmd protocol = get_protocol(host) - url = f"{protocol}://{host}/v2/resources/{iri}" + url = f"{protocol}://{host}/v2/resources" headers = {"Authorization": f"Bearer {token}"} - response = requests.get(url, headers=headers, timeout=5) + response = requests.put(url, headers=headers, json=payload, timeout=5) assert response.status_code == 200 - data: dict[str, Any] = response.json() - return data + logger.info(f"Updated permissions of resource {resource_iri}") -def __get_lmd(resource: dict[str, Any]) -> str | None: - """ - Gets last modification date from a resource JSON-LD dict. +def _update_permissions_for_resource_and_values( + resource_iri: str, + scope: PermissionScope, + host: str, + token: str, +) -> None: """ - return resource.get("knora-api:lastModificationDate") - - -def __get_type(resource: dict[str, Any]) -> str: + Updates the permissions for the given resource and its values. """ - Gets the type from a resource JSON-LD dict.""" - t: str = resource["@type"] - return t + resource = _get_resource(resource_iri, host, token) + lmd = _get_lmd(resource) + resource_type = _get_type(resource) + context = _get_context(resource) + values = _get_value_iris(resource) + _update_permissions_for_resource( + resource_iri=resource_iri, + lmd=lmd, + resource_type=resource_type, + context=context, + scope=scope, + host=host, + token=token, + ) + for v in values: + _update_permissions_for_value( + resource_iri=resource_iri, + value=v, + resource_type=resource_type, + context=context, + scope=scope, + host=host, + token=token, + ) -def __get_context(resource: dict[str, Any]) -> dict[str, str]: - """ - Gets the context object from a resource JSON-LD dict. - """ - c: dict[str, str] = resource["@context"] - return c +def apply_updated_oaps_on_server( + resource_oaps: list[Oap], + host: str, + token: str, +) -> None: + """Applies object access permissions on a DSP server.""" + logger.info("******* Applying updated object access permissions on server *******") + print(f"{get_timestamp()}: ******* Applying updated object access permissions on server *******") + for index, resource_oap in enumerate(resource_oaps): + msg = f"Updating permissions of resource {index + 1}/{len(resource_oaps)}: {resource_oap.object_iri}..." + logger.info("=====") + logger.info(msg) + print(f"{get_timestamp()}: {msg}") + _update_permissions_for_resource_and_values( + resource_iri=resource_oap.object_iri, + scope=resource_oap.scope, + host=host, + token=token, + ) + logger.info(f"Updated permissions of resource {resource_oap.object_iri} and its values.") diff --git a/dsp_permissions_scripts/utils/project.py b/dsp_permissions_scripts/utils/project.py index 600b2234..61a42a01 100644 --- a/dsp_permissions_scripts/utils/project.py +++ b/dsp_permissions_scripts/utils/project.py @@ -10,62 +10,20 @@ logger = get_logger(__name__) -def get_project_iri_by_shortcode(shortcode: str, host: str) -> str: - """ - Retrieves the IRI of a project by its shortcode. - """ - protocol = get_protocol(host) - url = f"{protocol}://{host}/admin/projects/shortcode/{shortcode}" - response = requests.get(url, timeout=5) - assert response.status_code == 200 - iri: str = response.json()["project"]["id"] - return iri - - -def get_all_resource_oaps_of_project( - shortcode: str, - host: str, - token: str, -) -> list[Oap]: - logger.info(f"******* Getting all resource OAPs of project {shortcode} *******") - print(f"{get_timestamp()}: ******* Getting all resource OAPs of project {shortcode} *******") - project_iri = get_project_iri_by_shortcode( - shortcode=shortcode, - host=host, - ) - all_resource_oaps = [] - resclass_iris = __get_all_resource_class_iris_of_project( - project_iri=project_iri, - host=host, - token=token, - ) - for resclass_iri in resclass_iris: - resource_oaps = __get_all_resource_oaps_of_resclass( - host=host, - resclass_iri=resclass_iri, - project_iri=project_iri, - token=token, - ) - all_resource_oaps.extend(resource_oaps) - logger.info(f"Retrieved a TOTAL of {len(all_resource_oaps)} resource OAPs of project {shortcode}.") - print(f"{get_timestamp()}: Retrieved a TOTAL of {len(all_resource_oaps)} resource OAPs of project {shortcode}.") - return all_resource_oaps - - -def __get_all_resource_class_iris_of_project( +def _get_all_resource_class_iris_of_project( project_iri: str, host: str, token: str, ) -> list[str]: logger.info(f"Getting all resource class IRIs of project {project_iri}...") - project_onto_iris = __get_onto_iris_of_project( + project_onto_iris = _get_onto_iris_of_project( project_iri=project_iri, host=host, token=token, ) all_class_iris = [] for onto_iri in project_onto_iris: - class_iris = __get_class_iris_of_onto( + class_iris = _get_class_iris_of_onto( host=host, onto_iri=onto_iri, token=token, @@ -75,7 +33,7 @@ def __get_all_resource_class_iris_of_project( return all_class_iris -def __get_onto_iris_of_project( +def _get_onto_iris_of_project( project_iri: str, host: str, token: str, @@ -90,7 +48,7 @@ def __get_onto_iris_of_project( return project_onto_iris -def __get_class_iris_of_onto( +def _get_class_iris_of_onto( host: str, onto_iri: str, token: str, @@ -103,16 +61,16 @@ def __get_class_iris_of_onto( all_entities = response.json()["@graph"] context = response.json()["@context"] class_ids = [c["@id"] for c in all_entities if c.get("knora-api:isResourceClass")] - class_iris = [__dereference_prefix(class_id, context) for class_id in class_ids] + class_iris = [_dereference_prefix(class_id, context) for class_id in class_ids] return class_iris -def __dereference_prefix(identifier: str, context: dict[str, str]) -> str: +def _dereference_prefix(identifier: str, context: dict[str, str]) -> str: prefix, actual_id = identifier.split(":") return context[prefix] + actual_id -def __get_all_resource_oaps_of_resclass( +def _get_all_resource_oaps_of_resclass( host: str, resclass_iri: str, project_iri: str, @@ -127,7 +85,7 @@ def __get_all_resource_oaps_of_resclass( more = True while more: logger.info(f"Getting page {page}...") - more, iris = __get_next_page( + more, iris = _get_next_page( protocol=protocol, host=host, resclass_iri=resclass_iri, @@ -141,7 +99,7 @@ def __get_all_resource_oaps_of_resclass( return resources -def __get_next_page( +def _get_next_page( protocol: str, host: str, resclass_iri: str, @@ -175,3 +133,45 @@ def __get_next_page( else: # there are no more resources return False, [] + + +def get_project_iri_by_shortcode(shortcode: str, host: str) -> str: + """ + Retrieves the IRI of a project by its shortcode. + """ + protocol = get_protocol(host) + url = f"{protocol}://{host}/admin/projects/shortcode/{shortcode}" + response = requests.get(url, timeout=5) + assert response.status_code == 200 + iri: str = response.json()["project"]["id"] + return iri + + +def get_all_resource_oaps_of_project( + shortcode: str, + host: str, + token: str, +) -> list[Oap]: + logger.info(f"******* Getting all resource OAPs of project {shortcode} *******") + print(f"{get_timestamp()}: ******* Getting all resource OAPs of project {shortcode} *******") + project_iri = get_project_iri_by_shortcode( + shortcode=shortcode, + host=host, + ) + all_resource_oaps = [] + resclass_iris = _get_all_resource_class_iris_of_project( + project_iri=project_iri, + host=host, + token=token, + ) + for resclass_iri in resclass_iris: + resource_oaps = _get_all_resource_oaps_of_resclass( + host=host, + resclass_iri=resclass_iri, + project_iri=project_iri, + token=token, + ) + all_resource_oaps.extend(resource_oaps) + logger.info(f"Retrieved a TOTAL of {len(all_resource_oaps)} resource OAPs of project {shortcode}.") + print(f"{get_timestamp()}: Retrieved a TOTAL of {len(all_resource_oaps)} resource OAPs of project {shortcode}.") + return all_resource_oaps