From 7f9f6ec5ab990f174b92113070ccad2bb1511ff1 Mon Sep 17 00:00:00 2001 From: Fernando Barreiro Date: Tue, 3 Dec 2024 16:10:00 +0100 Subject: [PATCH] documentation progress --- pandaserver/api/openapi_generator.py | 9 +- pandaserver/api/v1/harvester_api.py | 205 +++++++++++++++++---------- 2 files changed, 131 insertions(+), 83 deletions(-) diff --git a/pandaserver/api/openapi_generator.py b/pandaserver/api/openapi_generator.py index dd1ae344..1400b579 100644 --- a/pandaserver/api/openapi_generator.py +++ b/pandaserver/api/openapi_generator.py @@ -107,7 +107,7 @@ def extract_parameters(parsed): elif "bool" in param.type_name: param_type = "boolean" - # print(f"Param: {param.arg_name}, Type: {param_type}, Optional: {param.is_optional}") + print(f"Param: {param.arg_name}, Type: {param_type}, Optional: {param.is_optional} {param.description}") is_required = not param.is_optional parameter_schema = {"name": param.arg_name, "in": "query", "required": is_required, "schema": {"type": param_type}, "description": param.description} @@ -148,7 +148,7 @@ def extract_parameters_as_json(parsed): param_schema = {"type": param_type, "description": param.description} if param_type == "array": - param_schema["items"] = {"type": "string"} # Default array item type + param_schema["items"] = {"type": "object"} # Default array item type properties[param.arg_name] = param_schema @@ -236,6 +236,5 @@ def convert_docstrings_to_openapi(docstrings): # Convert docstrings to OpenAPI open_api_doc = convert_docstrings_to_openapi(docstrings) - - yaml_spec = yaml.dump(open_api_doc, sort_keys=False) - print(yaml_spec) + with open("/tmp/panda_api.yaml", "w") as output_file: + yaml.dump(open_api_doc, output_file, sort_keys=False) diff --git a/pandaserver/api/v1/harvester_api.py b/pandaserver/api/v1/harvester_api.py index 377dd2ec..35bed10b 100644 --- a/pandaserver/api/v1/harvester_api.py +++ b/pandaserver/api/v1/harvester_api.py @@ -28,21 +28,33 @@ def init_task_buffer(task_buffer: TaskBuffer) -> None: @request_validation(_logger, secure=True) -def update_workers(req: PandaRequest, harvester_id: str, workers: List) -> Tuple: +def update_workers(req: PandaRequest, harvester_id: str, workers: List) -> dict: """ - Update workers. Requires a secure connection. + Update workers. + + Update the details for a list of workers. Requires a secure connection. API details: HTTP Method: POST - Path: /harvester/v1/update_workers + Path: /v1/harvester/update_workers Args: req(PandaRequest): internally generated request object - harvester_id(str): string containing the harvester id - workers(list): TODO + harvester_id(str): harvester id, e.g. `harvester_central_A` + workers(list): list of worker dictionaries that describe the fields of a pandaserver/taskbuffer/WorkerSpec object. + ``` + [{"workerID": 1, "batchID": 1, "queueName": "queue1", "status": "running", + "computingSite": "site1", "nCore": 1, "nodeID": None, + "submitTime": "02-NOV-24 00:02:18", "startTime": "02-NOV-24 00:02:18", "endTime": None, + "jobType": "managed", "resourceType": "SCORE", "nativeExitCode": None, "nativeStatus": None, + "diagMessage": None, "nJobs": 1, "computingElement": "ce1", "syncLevel": 0, + "submissionHost": "submissionhost1", "harvesterHost": "harvesterhost1", + "errorCode": None, "minRamCount": 2000},...] + ``` Returns: - tuple: tuple with a boolean and a message, e.g. (False, 'Error message') or (True, 'OK') + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` + """ tmp_logger = LogWrapper(_logger, f"update_workers harvester_id={harvester_id}") tmp_logger.debug(f"Start") @@ -63,21 +75,42 @@ def update_workers(req: PandaRequest, harvester_id: str, workers: List) -> Tuple @request_validation(_logger, secure=True) -def update_harvester_service_metrics(req: PandaRequest, harvester_id: str, metrics: str) -> Tuple: +def update_harvester_service_metrics(req: PandaRequest, harvester_id: str, metrics: str) -> dict: """ - Update harvester service metrics. Requires a secure connection. + Update harvester service metrics. + + Update the service metrics for a harvester instance. Requires a secure connection. API details: HTTP Method: POST - Path: /harvester/v1/update_harvester_service_metrics + Path: /v1/harvester/update_harvester_service_metrics Args: req(PandaRequest): internally generated request object - harvester_id(str): string containing the harvester id - metrics(str): json dictionary containing the metrics to be updated in the PanDA database + harvester_id(str): harvester id, e.g. `harvester_central_A` + metrics(list): list of triplets `[[host, timestamp, metric_dict],[host, timestamp, metric_dict]...]`. The metric dictionary is json encoded, as it is stored in the database like that. + ``` + harvester_host = "harvester_host.cern.ch" + creation_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + metric = { + "rss_mib": 2737.36, + "memory_pc": 39.19, + "cpu_pc": 15.23, + "volume_data_pc": 20.0, + "cert_lifetime": { + "/data/atlpan/proxy/x509up_u25606_prod": 81, + "/data/atlpan/proxy/x509up_u25606_pilot": 81, + "/cephfs/atlpan/harvester/proxy/x509up_u25606_prod": 96, + "/cephfs/atlpan/harvester/proxy/x509up_u25606_pilot": 96, + }, + } + + # DBProxy expects the metrics in json format and stores them directly in the database + metrics = [[creation_time, harvester_host, json.dumps(metric)]] + ``` Returns: - str: json string with the result of the operation, typically a tuple with a boolean and a message, e.g. (False, 'Error message') or (True, 'OK') + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"update_harvester_service_metrics harvester_id={harvester_id}") tmp_logger.debug(f"Start") @@ -100,21 +133,33 @@ def update_harvester_service_metrics(req: PandaRequest, harvester_id: str, metri @request_validation(_logger, secure=True) -def add_harvester_dialogs(req: PandaRequest, harvester_id: str, dialogs: str) -> Tuple: +def add_harvester_dialogs(req: PandaRequest, harvester_id: str, dialogs: str) -> dict: """ - Add harvester dialog messages. Requires a secure connection. + Add harvester dialog messages. + + Add messages for a harvester instance. Requires a secure connection. API details: HTTP Method: POST - Path: /harvester/v1/add_harvester_dialogs + Path: /v1/harvester/add_harvester_dialogs Args: req(PandaRequest): internally generated request object - harvester_id(str): string containing the harvester id - dialogs(str): json dictionary with the dialog messages to be added to the PanDA database + harvester_id(str): harvester id, e.g. `harvester_central_A` + dialogs(list): list of dialog dictionaries, e.g. + ``` + dialogs = [{ + "diagID": 1, + "moduleName": "test_module", + "identifier": "test identifier", + "creationTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), + "messageLevel": "INFO", + "diagMessage": "test message", + },...] + ``` Returns: - str: json string with the result of the operation, typically a tuple with a boolean and a message, e.g. (False, 'Error message') or (True, 'OK') + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"add_harvester_dialogs harvester_id={harvester_id}") tmp_logger.debug(f"Start") @@ -129,21 +174,23 @@ def add_harvester_dialogs(req: PandaRequest, harvester_id: str, dialogs: str) -> @request_validation(_logger, secure=True) -def harvester_heartbeat(req: PandaRequest, harvester_id: str, data: str = None) -> Tuple: +def harvester_heartbeat(req: PandaRequest, harvester_id: str, data: str = None) -> dict: """ - Heartbeat for harvester. User and host are retrieved from the request object and updated in the database. Requires a secure connection. + Heartbeat for harvester. + + Send a heartbeat for harvester and optionally update the instance data. User and host are retrieved from the request object and updated in the database. Requires a secure connection. API details: HTTP Method: POST - Path: /harvester/v1/add_harvester_dialogs + Path: /v1/harvester/add_harvester_dialogs Args: req(PandaRequest): internally generated request object - harvester_id(str): string containing the harvester id - data(str): list of data to be updated in the PanDA database + harvester_id(str): harvester id, e.g. `harvester_central_A` + data(list): list of data to be updated in the PanDA database Returns: - str: json string with the result of the operation, typically a tuple with a boolean and a message, e.g. (False, 'Error message') or (True, 'OK') + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"harvester_heartbeat harvester_id={harvester_id}") tmp_logger.debug(f"Start") @@ -161,20 +208,22 @@ def harvester_heartbeat(req: PandaRequest, harvester_id: str, data: str = None) return generate_response(True) -def get_current_worker_id(req: PandaRequest, harvester_id: str) -> Tuple: +def get_current_worker_id(req: PandaRequest, harvester_id: str) -> dict: """ - TODO: Validate. Get the current worker ID. + Get the current worker ID. + + Retrieve the current worker ID. API details: HTTP Method: GET - Path: /harvester/v1/get_current_worker_id + Path: /v1/harvester/get_current_worker_id Args: req(PandaRequest): internally generated request object - harvester_id(str): string containing the harvester id + harvester_id(str): harvester id, e.g. `harvester_central_A` Returns: - str: json string with the result of the operation, typically a tuple with a boolean and a message, e.g. (False, 'Error message') or (True, ) + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"get_current_worker_id") tmp_logger.debug(f"Start") @@ -187,19 +236,21 @@ def get_current_worker_id(req: PandaRequest, harvester_id: str) -> Tuple: return generate_response(True, data=current_worker_id) -def get_worker_statistics(req: PandaRequest) -> Tuple: +def get_worker_statistics(req: PandaRequest) -> dict: """ + Get worker statistics. + Get statistics for all the workers managed across the Grid. API details: HTTP Method: GET - Path: /harvester/v1/get_worker_statistics + Path: /v1/harvester/get_worker_statistics Args: req(PandaRequest): internally generated request object Returns: - str: json string with the result of the operation, typically a tuple with a boolean and the statistics or an error message, e.g. (False, 'Error message') or (True, {...}}) + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"get_worker_statistics") tmp_logger.debug(f"Start") @@ -209,36 +260,27 @@ def get_worker_statistics(req: PandaRequest) -> Tuple: @request_validation(_logger, secure=True) -def report_worker_statistics(req: PandaRequest, harvester_id: str, panda_queue: str, statistics: str) -> Tuple: +def report_worker_statistics(req: PandaRequest, harvester_id: str, panda_queue: str, statistics: str) -> dict: """ + Report worker statistics. + Report statistics for the workers managed by a harvester instance at a PanDA queue. Requires a secure connection. API details: HTTP Method: POST - Path: /harvester/v1/report_worker_statistics + Path: /v1/harvester/report_worker_statistics Args: req (PandaRequest): Internally generated request object. - harvester_id(str): Harvester ID. - panda_queue(str): Name of the PanDA queue. - statistics(str): JSON string containing a dictionary with the statistics to be reported. - The format should follow this structure: - - :: - - { - "prodsourcelabel_1": { - "RESOURCE_TYPE_1": {"running": 1, "submitted": 2, ...}, - "RESOURCE_TYPE_2": {"running": 1, "submitted": 2, ...} - }, - "prodsourcelabel_2": { - ... - } - } + harvester_id(str): harvester id, e.g. `harvester_central_A` + panda_queue(str): Name of the PanDA queue, e.g. `CERN`. + statistics(str): JSON string containing a dictionary with the statistics to be reported. It will be stored as a json in the database. E.g. + ``` + json.dumps({"user": {"SCORE": {"running": 1, "submitted": 1}}, "managed": {"MCORE": {"running": 1, "submitted": 1}}}) + ``` Returns: - str: JSON string with the result of the operation, typically a tuple with a boolean and a message, - e.g., `(False, 'Error message')` or `(True, 'OK')`. + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"report_worker_statistics harvester_id={harvester_id}") tmp_logger.debug(f"Start") @@ -248,22 +290,24 @@ def report_worker_statistics(req: PandaRequest, harvester_id: str, panda_queue: @request_validation(_logger, secure=True, production=True) -def get_harvester_commands(req: PandaRequest, harvester_id: str, n_commands: int, timeout: int = 30) -> Tuple: +def get_harvester_commands(req: PandaRequest, harvester_id: str, n_commands: int, timeout: int = 30) -> dict: """ + Get harvester commands. + Retrieves the commands for a specified harvester instance. Requires a secure connection and production role. API details: HTTP Method: GET - Path: /harvester/v1/get_harvester_commands + Path: /v1/harvester/get_harvester_commands Args: req(PandaRequest): The request object containing the environment variables. - harvester_id(str): The ID of the harvester instance. - n_commands(int): The number of commands to retrieve. - timeout(int, optional): The timeout value. Defaults to 30. + harvester_id(str): harvester id, e.g. `harvester_central_A` + n_commands(int): The number of commands to retrieve, e.g. `10`. + timeout(int, optional): The timeout value. Defaults to `30`. Returns: - dict: The response from the job dispatcher. + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"get_harvester_commands") tmp_logger.debug(f"Start") @@ -288,21 +332,23 @@ def get_harvester_commands(req: PandaRequest, harvester_id: str, n_commands: int @request_validation(_logger, secure=True, production=True) -def acknowledge_harvester_commands(req: PandaRequest, command_ids: List, timeout: int = 30) -> Tuple: +def acknowledge_harvester_commands(req: PandaRequest, command_ids: List, timeout: int = 30) -> dict: """ + Acknowledge harvester commands. + Acknowledges the list of command IDs in the PanDA database. Requires a secure connection and production role. API details: HTTP Method: GET - Path: /harvester/v1/acknowledge_harvester_commands + Path: /v1/harvester/acknowledge_harvester_commands Args: req(PandaRequest): The request object containing the environment variables. - command_ids(str): A JSON string containing the list of command IDs to acknowledge. - timeout(int, optional): The timeout value. Defaults to 30. + command_ids(list): A list of command IDs to acknowledge, e.g. `[1, 2, 3, 4,...]`. + timeout(int, optional): The timeout value. Defaults to `30`. Returns: - dict: The response from the job dispatcher. + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"acknowledge_harvester_commands") tmp_logger.debug(f"Start") @@ -327,23 +373,25 @@ def acknowledge_harvester_commands(req: PandaRequest, command_ids: List, timeout @request_validation(_logger, secure=True, production=True) -def add_sweep_harvester_command(req: PandaRequest, panda_queue: str, status_list: List[str], ce_list: List[str], submission_host_list: List[str]) -> Tuple: +def add_sweep_harvester_command(req: PandaRequest, panda_queue: str, status_list: List[str], ce_list: List[str], submission_host_list: List[str]) -> dict: """ + Add sweep command for harvester. + Send a command to harvester to kill the workers in a PanDA queue, with the possibility of specifying filters by status, CE or submission host. Requires a secure connection and production role. API details: HTTP Method: POST - Path: /harvester/v1/add_sweep_harvester_command + Path: /v1/harvester/add_sweep_harvester_command Args: req(PandaRequest): internally generated request object - panda_queue(str): name of the PanDA queue - status_list (list): list of worker statuses to be considered, e.g. ['submitted', 'running'] - ce_list (list): list of the Computing Elements to be considered - submission_host_list(list): list of the harvester submission hosts to be considered + panda_queue(str): Name of the PanDA queue, e.g. `CERN`. + status_list (list): list of worker statuses to be considered, e.g. `['submitted', 'running']` + ce_list (list): list of the Computing Elements to be considered, e.g. `['ce1.cern.ch', 'ce2.cern.ch']` + submission_host_list(list): list of the harvester submission hosts to be considered, e.g. `['submission_host1.cern.ch', 'submission_host2.cern.ch']` Returns: - str: json string with the result of the operation, typically a tuple with a boolean and a message, e.g. (False, 'Error message') or (True, 'OK') + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"add_sweep_harvester_command panda_queue={panda_queue}") @@ -360,23 +408,24 @@ def add_sweep_harvester_command(req: PandaRequest, panda_queue: str, status_list @request_validation(_logger, secure=True, production=True) def add_target_slots(req, panda_queue: str, slots: int, global_share: str = None, resource_type: str = None, expiration_date: str = None): """ - Set the number of slots for a PanDA queue. Requires secure connection and production role. + Set target slots. + + Set the target number of slots for a PanDA queue, when you want to build up job pressure. Requires secure connection and production role. API details: HTTP Method: POST - Path: /harvester/v1/add_target_slots + Path: /v1/harvester/add_target_slots Args: req (PandaRequest): Internally generated request object. - panda_queue (str): Name of the PanDA queue. - slots (int): Number of slots to set. - global_share (str, optional): Global share the slots apply to. Optional - by default it applies to the whole queue. - resource_type (str, optional): Resource type (SCORE, MCORE,...) the slots apply to. Optional - by default it applies to the whole queue. + panda_queue(str): Name of the PanDA queue, e.g. `CERN`. + slots (int): Number of slots to set, e.g. `10000`. + global_share (str, optional): Global share the slots apply to. Optional - by default it applies to the whole queue. E.g. `User Analysis` + resource_type (str, optional): Resource type the slots apply to. Optional - by default it applies to the whole queue. E.g. `SCORE` or `MCORE`. expiration_date (str, optional): The expiration date of the slots. Optional - by default it applies indefinitely. Returns: - str: JSON string with the result of the operation, typically a tuple with a boolean and a message, - e.g., (False, 'Error message') or (True, '...'). + dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': }` """ tmp_logger = LogWrapper(_logger, f"add_target_slots panda_queue={panda_queue}") tmp_logger.debug(f"Start with slots={slots}, global_share={global_share}, resource_type={resource_type}, expiration_date={expiration_date}")