From 064b8bec4772faaacb69d712333d7b1c48b6904b Mon Sep 17 00:00:00 2001 From: Aastha Gupta <71313011+velotioaastha@users.noreply.github.com> Date: Wed, 8 May 2024 02:56:47 +0530 Subject: [PATCH] Wandb SCIM py wrapper (#519) * added python function as per the required apis. * added header authorization for apis. * added examples to test api modules. * Added code changes * reverted commented code * Added requirements file * update functions * add functional documentation and inline comments * test all functions * fixed review comments * Added changes to teams, custom role API methods * updated function names * fixed variable naming * added get api fixes after validation * fixed custom roles changes * updated custom-roles variable names * updated teams apis --------- Co-authored-by: Aastha Gupta --- wandb-scim/__init__.py | 0 wandb-scim/custom_roles.py | 243 ++++++++++++++++++++ wandb-scim/examples/custom_roles_example.py | 160 +++++++++++++ wandb-scim/examples/requirements.txt | 1 + wandb-scim/examples/teams_example.py | 118 ++++++++++ wandb-scim/examples/user_example.py | 125 ++++++++++ wandb-scim/teams.py | 181 +++++++++++++++ wandb-scim/users.py | 209 +++++++++++++++++ 8 files changed, 1037 insertions(+) create mode 100644 wandb-scim/__init__.py create mode 100644 wandb-scim/custom_roles.py create mode 100644 wandb-scim/examples/custom_roles_example.py create mode 100644 wandb-scim/examples/requirements.txt create mode 100644 wandb-scim/examples/teams_example.py create mode 100644 wandb-scim/examples/user_example.py create mode 100644 wandb-scim/teams.py create mode 100644 wandb-scim/users.py diff --git a/wandb-scim/__init__.py b/wandb-scim/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wandb-scim/custom_roles.py b/wandb-scim/custom_roles.py new file mode 100644 index 00000000..b38c93c5 --- /dev/null +++ b/wandb-scim/custom_roles.py @@ -0,0 +1,243 @@ +import base64 +import json +import requests + +class CustomRole(object): + def __init__(self, base_url, username, api_key): + """ + Initializes the CustomRole object with username and API key. + + Args: + base_url (str): Host url. + username (str): The username for authentication. + api_key (str): The API key for authentication. + """ + # Encode the username and API key into a base64-encoded string for Basic Authentication + auth_str = f"{username}:{api_key}" + auth_bytes = auth_str.encode('ascii') + self.base_url = base_url + self.auth_token = base64.b64encode(auth_bytes).decode('ascii') + + # Create the authorization header for API requests + self.authorization_header = f"Basic {self.auth_token}" + + def create(self, request_payload): + """ + Creates a new custom role. + + Args: + request_payload (dict): The payload containing custom role data. + It should contain the following keys: + - 'permissions' (list[dict]): The permissions object for the custom role. + - 'inheritedFrom' (str): The inheritance information for the custom role. + + Returns: + str: A message indicating whether the custom role creation was successful or failed. + """ + print("Creating the custom role") + data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Role"], + "name": "Sample custom role", + "description": "A sample custom role for example", + "permissions": request_payload['permissions'], + "inheritedFrom": request_payload['inheritedFrom'] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Roles" + response = requests.post(url, json=data, headers=headers) + + if response.status_code == 201: + return "Custom role has been created!" + return f"Custom role creation failed. Status code: {response.status_code}" + + def get(self, role_id): + """ + Retrieves custom role details for role_id. + + Args: + role_id (str): role_id from the custom role. + + Returns: + str: A message containing custom role details or indicating failure. + """ + print("Getting the custom role") + + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Roles/{role_id}" + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return json.dumps(response.text, indent=4) + return f"Get custom role failed. Status code: {response.status_code}" + + def get_all(self): + """ + Retrieves details of all custom roles in the organization. + + Returns: + str: A message containing details of all custom roles or indicating failure. + """ + print("Getting all the custom roles in org") + + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Roles" + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return json.dumps(response.text, indent=4) + return f"Get all custom roles failed. Status code: {response.status_code}" + + def add_permissions(self, role_id, request_payload): + """ + Adds permission to a custom role. + + Args: + role_id (str): role_id from the custom role. + request_payload (dict): The payload containing permission information. + It should contain the following key: + - 'permissions' (list[dict]): The permissions object to be added to the custom role. + + Returns: + str: A message indicating whether the permission addition was successful or failed. + """ + print("Add permission to a custom role") + data = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "add", + "path": "permissions", + "value": request_payload['permissions'] + } + ] + } + url = f"{self.base_url}/scim/Roles/{role_id}" + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + + response = requests.patch(url, json=data, headers=headers) + + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "Custom role updated successfully" + + elif response.status_code == 404: + return "Custom role not found" + else: + return f"Failed to update custom role. Status code: {response.status_code}" + + def remove_permissions(self, role_id, request_payload): + """ + Removes permission from a custom role. + + Args: + role_id (str): role_id from the custom role. + request_payload (dict): The payload containing permission information. + It should contain the following key: + - 'permissions' (list[dict]): The permissions Object to be removed from the custom role. + + Returns: + str: A message indicating whether the permission removal was successful or failed. + """ + print("Remove permission from a custom role") + data = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "remove", + "path": "permissions", + "value": request_payload['permissions'] + } + ] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Roles/{role_id}" + response = requests.patch(url, json=data, headers=headers) + + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "Custom role updated successfully" + + elif response.status_code == 404: + return "Custom role not found" + else: + return f"Failed to update custom role. Status code: {response.status_code}" + + def update(self, role_id, request_payload): + """ + Updates name and description of a custom role. + + Args: + role_id (str): role_id from the custom role. + request_payload (dict): The payload containing role information. + It should contain any of the following keys: + - 'roleName' (str): The name of the custom role. + - 'roleDescription' (str): The description of the custom role. + - 'inheritedFrom' (str): The inheritance information for the custom role. + + Returns: + str: A message indicating whether the custom role update was successful or failed. + """ + print("Update name and description of custom role") + data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Role"] + } + data.update(request_payload) + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Roles/{role_id}" + response = requests.put(url, json=data, headers=headers) + + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "Custom role updated successfully" + + elif response.status_code == 404: + return "Custom role not found" + else: + return f"Failed to update custom role. Status code: {response.status_code}" + + def delete(self, role_id): + """ + Deletes a custom role. + + Args: + role_id (str): role_id from the custom role. + + Returns: + str: A message indicating whether the custom role deletion was successful or failed. + """ + print("Deleting custom role") + + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Roles/{role_id}" + response = requests.delete(url, headers=headers) + + if response.status_code == 204: + return "Custom role deleted successfully" + elif response.status_code == 404: + return "Custom role not found" + else: + return f"Failed to delete custom role. Status code: {response.status_code}" diff --git a/wandb-scim/examples/custom_roles_example.py b/wandb-scim/examples/custom_roles_example.py new file mode 100644 index 00000000..df6700b4 --- /dev/null +++ b/wandb-scim/examples/custom_roles_example.py @@ -0,0 +1,160 @@ +# calling_module.py + +import requests +import sys +sys.path.append('../') +from custom_roles import CustomRole # Assuming CustomRole class is defined in custom_role.py + +def create_custom_role(custom_role, permission_object, inherited_from): + """ + Creates a new custom role. + + Args: + custom_role (CustomRole): An instance of the CustomRole class. + permission_object (list[dict]): Array of permission objects where each object has name field. + inherited_from (str): The source from which the custom role inherits permissions. + """ + try: + # Create a new custom role + create_custom_role_response = custom_role.create( + request_payload={ + "permissions": permission_object, + "inheritedFrom": inherited_from + } + ) + print(create_custom_role_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def get_custom_role(custom_role, role_id): + """ + Retrieves details of a specific custom role. + + Args: + custom_role (CustomRole): An instance of the CustomRole class. + role_id (str): The ID of the custom role to retrieve. + """ + try: + # Get details of a custom role + get_custom_role_response = custom_role.get(role_id) + print(get_custom_role_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def get_all_custom_roles(custom_role): + """ + Retrieves details of all custom roles. + + Args: + custom_role (CustomRole): An instance of the CustomRole class. + """ + try: + # Get all custom roles + get_all_custom_roles_response = custom_role.get_all() + print(get_all_custom_roles_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def add_permissions(custom_role, role_id, permission_object): + """ + Adds permission to a custom role. + + Args: + custom_role (CustomRole): An instance of the CustomRole class. + role_id (str): The ID of the custom role to update. + permission_object (list[dict]): Array of permission objects where each object has name field. + """ + try: + # Add permission to a custom role + add_permissions_response = custom_role.add_permissions( + role_id, + request_payload={ + "permissions": permission_object + } + ) + print(add_permissions_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def remove_permissions(custom_role, role_id, permission_object): + """ + Removes permission from a custom role. + + Args: + custom_role (CustomRole): An instance of the CustomRole class. + role_id (str): The ID of the custom role to update. + permission_object (list[dict]): Array of permission objects where each object has name field. + """ + try: + # Remove permission from a custom role + remove_permissions_response = custom_role.remove_permissions( + role_id, + request_payload={ + "permissions": permission_object + } + ) + print(remove_permissions_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def update_custom_role(custom_role, role_id, update_params): + """ + Updates a custom role. + Args: + custom_role (CustomRole): An instance of the CustomRole class. + role_id (str): The ID of the custom role to update. + update_params (dict): a dict which can contain any of the following values. + name (str): The updated name of the custom role. + description (str): The updated description of the custom role. + inherited_from (str): The updated source from which the custom role inherits permissions. + """ + try: + request_payload={} + if('name' in update_params): + request_payload["name"] = update_params['name'] + if('description' in update_params): + request_payload["description"] = update_params['description'] + if('inherited_from' in update_params): + request_payload["inheritedFrom"] = update_params['inherited_from'] + # Update a custom role + update_custom_role_response = custom_role.update( + role_id, + request_payload + ) + print(update_custom_role_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def delete_custom_role(custom_role, role_id): + """ + Deletes a custom role. + + Args: + custom_role (CustomRole): An instance of the CustomRole class. + role_id (str): The ID of the custom role to delete. + """ + try: + # Delete the custom role + delete_custom_role_response = custom_role.delete(role_id) + print(delete_custom_role_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +if __name__ == "__main__": + username = "your_username" + api_key = "your_api_key" + base_url = "https://example.com/api" # Base URL of the API + + # Instantiate the CustomRole class with your credentials + custom_role = CustomRole(base_url, username, api_key) + role_id = "abc" # Replace with actual role ID + permission_object = [{"name": "project:create"}, {"name": "project:update"}, {"name": "project:delete"}] # Replace with required permissions + + # Test Functions + get_all_custom_roles(custom_role) + # create_custom_role(custom_role, permission_object, "member") + # get_custom_role(custom_role, role_id) + # add_permissions(custom_role, role_id, permission_object) + # remove_permissions(custom_role, role_id, permission_object) + # update_custom_role(custom_role, role_id, {"name": "test-role1"}) + # delete_custom_role(custom_role, role_id) \ No newline at end of file diff --git a/wandb-scim/examples/requirements.txt b/wandb-scim/examples/requirements.txt new file mode 100644 index 00000000..077c95d8 --- /dev/null +++ b/wandb-scim/examples/requirements.txt @@ -0,0 +1 @@ +requests==2.31.0 \ No newline at end of file diff --git a/wandb-scim/examples/teams_example.py b/wandb-scim/examples/teams_example.py new file mode 100644 index 00000000..411095d6 --- /dev/null +++ b/wandb-scim/examples/teams_example.py @@ -0,0 +1,118 @@ +# calling_module.py + +import requests +import sys +sys.path.append('../') +from teams import Teams # Assuming the Teams class is defined in teams.py + +def create_team(teams, display_name, member_ids): + """ + Creates a new team. + + Args: + teams (Teams): An instance of the Teams class. + display_name (str): The display name of the new team. + member_ids (list): The ID of the member to be added to the team. + """ + try: + # Create a new team + member_object = [] + for id in member_ids: + member_object.append({"value": id}) + create_team_response = teams.create( + request_payload={ + "displayName": display_name, + "members": member_object + } + ) + print(create_team_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def get_team(teams, team_id): + """ + Retrieves details of a specific team. + + Args: + teams (Teams): An instance of the Teams class. + team_id (str): The ID of the team to retrieve. + """ + try: + # Get team details + get_team_response = teams.get(team_id) + print(get_team_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def get_all_teams(teams): + """ + Retrieves details of all teams in the organization. + + Args: + teams (Teams): An instance of the Teams class. + """ + try: + # Get all teams in the organization + get_all_teams_response = teams.get_all() + print(get_all_teams_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def add_members(teams, team_id, member_ids): + """ + Updates a team by adding members. + + Args: + teams (Teams): An instance of the Teams class. + team_id (str): The ID of the team to update. + member_ids (list): The IDs of the members to added to the team. + """ + try: + # Update team by adding members + member_object = [] + for id in member_ids: + member_object.append({"value": id}) + add_members_response = teams.add_members( + team_id, + member_object + ) + print(add_members_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def remove_members(teams, team_id, member_ids): + """ + Updates a team by removing members. + + Args: + teams (Teams): An instance of the Teams class. + team_id (str): The ID of the team to update. + member_ids (list): The IDs of the members to removed from the team. + """ + try: + # Update team by removing members + member_object = [] + for id in member_ids: + member_object.append({"value": id}) + remove_members_response = teams.remove_members( + team_id, + member_object + ) + print(remove_members_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +if __name__ == "__main__": + username = "your_username" + api_key = "your_api_key" + base_url = "https://example.com/api" # Base URL of the API + + # Instantiate the Teams class with your credentials + teams = Teams(base_url, username, api_key) + + # Test Functions + get_all_teams(teams) + # create_team(teams, "team-name-asdfghjsdfgh",["VXNlcjoxNjg1NzA5"]) + # get_team(teams, "team_id") + # add_members(teams, "RW50aXR5OjIyMTgyMjI=", ["VXNlcjoxODcxODU1", "VXNlcjoxNjg1NzA5"]) + # remove_members(teams, "RW50aXR5OjIyMTgyMjI=", ["VXNlcjoxODcxODU1", "VXNlcjoxNjg1NzA5"]) \ No newline at end of file diff --git a/wandb-scim/examples/user_example.py b/wandb-scim/examples/user_example.py new file mode 100644 index 00000000..3ceda7ec --- /dev/null +++ b/wandb-scim/examples/user_example.py @@ -0,0 +1,125 @@ +# calling_module.py + +import requests +import sys +sys.path.append('../') +from users import User # Assuming the User class is defined in user_module.py + +def create_user(user, email, name): + """ + Creates a new user. + + Args: + user (User): An instance of the User class. + email (str): The email address of the new user. + name (str): The name of the new user. + """ + try: + # Create a new user + create_user_response = user.create( + request_payload={"email": email, "name": name} + ) + print(create_user_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def get_user(user, user_id): + """ + Retrieves details of a specific user. + + Args: + user (User): An instance of the User class. + user_id (str): The ID of the user to retrieve. + """ + try: + # Get user details + get_user_response = user.get(user_id) + print(get_user_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def get_all_users(user): + """ + Retrieves details of all users in the organization. + + Args: + user (User): An instance of the User class. + """ + try: + # Get all users in the organization + get_all_users_response = user.get_all() + print(get_all_users_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def deactivate_user(user, user_id): + """ + Deactivates a user. + + Args: + user (User): An instance of the User class. + user_id (str): The ID of the user to deactivate. + """ + try: + # Deactivate a user + deactivate_user_response = user.deactivate(user_id) + print(deactivate_user_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + +def assign_org_role_to_user(user, user_id, role_name): + """ + Assigns a org-level role to a user. + + Args: + user (User): An instance of the User class. + user_id (str): The ID of the user to assign the role to. + role_name (str): The name of the role to assign. + """ + try: + # Assign a role to the user + assign_org_role_response = user.assign_org_role( + user_id, + request_payload={"roleName": role_name} + ) + print(assign_org_role_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + + +def assign_team_role_to_user(user, user_id, team_name, role_name): + """ + Assigns a team-level role to a user. + + Args: + user (User): An instance of the User class. + user_id (str): The ID of the user to assign the team role to. + team_name (str): The name of the team to assign the role from. + role_name (str): The name of the role to assign. + """ + try: + # Assign team role to a user + assign_team_role_response = user.assign_team_role( + user_id, + request_payload={"roleName": role_name, "teamName": team_name} + ) + print(assign_team_role_response) + except requests.exceptions.RequestException as e: + print(f"Error occurred during API request: {str(e)}") + + +if __name__ == "__main__": + username = "your_username" + api_key = "your_api_key" + base_url = "https://example.com/api" + + # Instantiate the User class with your credentials + user = User(base_url, username, api_key) + + # Test Functions + get_all_users(user) + # create_user(user, "test@example.com", "Test User") + # get_user(user, "user_id") + # deactivate_user(user, "user_id") + # assign_org_role_to_user(user, "user_id", "role_name") + # assign_team_role_to_user(user, "user_id", "team_name", "role_name") \ No newline at end of file diff --git a/wandb-scim/teams.py b/wandb-scim/teams.py new file mode 100644 index 00000000..fac80c5b --- /dev/null +++ b/wandb-scim/teams.py @@ -0,0 +1,181 @@ +import base64 +import json +import requests + +class Teams(object): + def __init__(self, base_url, username, api_key): + """ + Initializes the Teams object with username and API key. + + Args: + base_url (str): Host url. + username (str): The username for authentication. + api_key (str): The API key for authentication. + """ + # Encode the username and API key into a base64-encoded string for Basic Authentication + auth_str = f"{username}:{api_key}" + auth_bytes = auth_str.encode('ascii') + self.base_url = base_url + self.auth_token = base64.b64encode(auth_bytes).decode('ascii') + + # Create the authorization header for API requests + self.authorization_header = f"Basic {self.auth_token}" + + def create(self, request_payload): + """ + Creates a new team. + + Args: + request_payload (dict): The payload containing team data. + - 'displayName': The display name of the team. + - 'members' (list[dict]): The payload containing members information. + It should contain the key in each element: + - 'value': The id of the member to be added to the team. + + Returns: + str: A message indicating whether the team creation was successful or failed. + """ + print("Creating the team") + data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], + "displayName": request_payload['displayName'], + "members": request_payload['members'] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a POST request to create the team + url = f"{self.base_url}/scim/Groups" + response = requests.post(url, json=data, headers=headers) + + if response.status_code == 201: + return "Team has been created!" + return f"Team creation failed. Status code: {response.status_code}" + + def get(self, team_id): + """ + Retrieves team details. + + Args: + team_id (str): team_id of the team_id. + + Returns: + str: A message containing team details or indicating failure. + """ + print("Getting the team") + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a GET request to retrieve the team + url = f"{self.base_url}/scim/Groups/{team_id}" + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return json.dumps(response.text, indent=4) + return f"Get team failed. Status code: {response.status_code}" + + def get_all(self): + """ + Retrieves details of all teams in the organization. + + Returns: + str: A message containing details of all teams or indicating failure. + """ + print("Getting all the teams in org") + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a GET request to retrieve all teams + url = f"{self.base_url}/scim/Groups" + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return json.dumps(response.text, indent=4) + return f"Get teams failed. Status code: {response.status_code}" + + def add_members(self, team_id, request_payload): + """ + Adds a members to the team. + + Args: + team_id (str): team_id of the team_id. + request_payload (list[dict]): The payload containing members information. + It should contain the key in each element: + - 'value': The id of the member to be added to the team. + + Returns: + str: A message indicating whether the member addition was successful or failed. + """ + print("Adding member to the team") + data = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "add", + "path": "members", + "value": request_payload + } + ] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a PATCH request to add the member to the team + url = f"{self.base_url}/scim/Groups/{team_id}" + response = requests.patch(url, json=data, headers=headers) + + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "Team updated successfully" + + elif response.status_code == 404: + return "Team not found" + else: + return f"Failed to update team. Status code: {response.status_code}" + + def remove_members(self, team_id, request_payload): + """ + Removes members from the team. + + Args: + team_id (str): team_id of the team_id. + request_payload (list[dict]): The payload containing members information. + It should contain the following key in each element: + - 'value': The id of the member to be removed from the team. + + Returns: + str: A message indicating whether the member removal was successful or failed. + """ + print("Removing member from the team") + data = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "remove", + "path": "members", + "value": request_payload + } + ] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a PATCH request to remove the member from the team + url = f"{self.base_url}/scim/Groups/{team_id}" + response = requests.patch(url, json=data, headers=headers) + + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "Team updated successfully" + + elif response.status_code == 404: + return "Team not found" + else: + return f"Failed to update team. Status code: {response.status_code}" diff --git a/wandb-scim/users.py b/wandb-scim/users.py new file mode 100644 index 00000000..e9397ba3 --- /dev/null +++ b/wandb-scim/users.py @@ -0,0 +1,209 @@ +import base64 +import json +import requests + +class User(object): + def __init__(self, base_url, username, api_key): + """ + Initialize User object with username and API key. + + Args: + base_url (str): Host url. + username (str): The username for authentication. + api_key (str): The API key for authentication. + """ + # Encode the username and API key into a base64-encoded string for Basic Authentication + auth_str = f"{username}:{api_key}" + auth_bytes = auth_str.encode('ascii') + self.base_url = base_url + self.auth_token = base64.b64encode(auth_bytes).decode('ascii') + + # Create the authorization header for API requests + self.authorization_header = f"Basic {self.auth_token}" + + def create(self, request_payload): + """ + Creates a new user. + + Args: + request_payload (dict): The payload containing user data. + + Returns: + str: A message indicating whether the user creation was successful or failed. + """ + print("Creating the User") + data = { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User" + ], + "emails": [ + { + "primary": True, + "value": request_payload['email'] + } + ], + "userName": request_payload['name'] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a POST request to create the user + url = f"{self.base_url}/scim/Users" + response = requests.post(url, json=data, headers=headers) + if response.status_code == 201: + return "User has been created!" + return f"User creation failed. Status code: {response.status_code}" + + def get(self, user_id): + """ + Retrieves user details. + + Args: + user_id (str): user_id of the user. + + Returns: + str: A message containing user details or indicating failure. + """ + print("Getting the User") + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a GET request to retrieve the user + url = f"{self.base_url}/scim/Users/{user_id}" + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return json.dumps(response.text, indent=4) + return f"Get user failed. Status code: {response.status_code}" + + def get_all(self): + """ + Retrieves details of all users in the organization. + + Returns: + str: A message containing details of all users or indicating failure. + """ + print("Getting all the Users in org") + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"{self.base_url}/scim/Users" + # Send a GET request to retrieve all users + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return json.dumps(response.text, indent=4) + return f"Get users failed. Status code: {response.status_code}" + + def deactivate(self, user_id): + """ + Deactivates a user. + + Args: + user_id (str): user_id of the user. + + Returns: + str: A message indicating whether the user deactivation was successful or failed. + """ + print("deleting the User") + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a DELETE request to deactivate the user + url = f"{self.base_url}/scim/Users/{user_id}" + response = requests.delete(url, headers=headers) + + if response.status_code == 204: + return "User has deleted successfully!" + elif response.status_code == 404: + return "User not found" + else: + return f"Failed to delete user. Status code: {response.status_code}" + + def assign_org_role(self, user_id, request_payload): + """ + Assigns a role to a user. + + Args: + user_id (str): user_id of the user. + request_payload (dict): The payload containing role information. + It should contain the following key: + - 'roleName': The role to be assigned to the user. It can be one of 'admin', 'viewer', or 'member'. + + Returns: + str: A message indicating whether the role assignment was successful or failed. + """ + print("Assign a role to the User") + data = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "replace", + "path": "organizationRole", + "value": request_payload['roleName'] + } + ] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a PATCH request to assign the role to the user + url = f"{self.base_url}/scim/Users/{user_id}" + response = requests.patch(url, json=data, headers=headers) + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "User updated successfully" + elif response.status_code == 404: + return "User not found" + else: + return f"Failed to update user. Status code: {response.status_code}" + + def assign_team_role(self, user_id, request_payload): + """ + Assigns a role to a user of the team. + + Args: + user_id (str): user_id of the user. + request_payload (dict): The payload containing role information. + + Returns: + str: A message indicating whether the role assignment was successful or failed. + """ + print("assign a role to the User of the team") + data = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "replace", + "path": "teamRoles", + "value": [ + { + "roleName": request_payload['roleName'], + "teamName": request_payload['roleName'] + } + ] + } + ] + } + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + # Send a PATCH request to assign the role to the user of the team + url = f"{self.base_url}/scim/Users/{user_id}" + response = requests.patch(url, json=data, headers=headers) + + if response.status_code == 200: + updated_data = response.json() # Get the updated resource data from the response + print("Updated Data:", updated_data) + return "User updated successfully" + elif response.status_code == 404: + return "User not found" + else: + return f"Failed to update user. Status code: {response.status_code}"