From 10f6b19902cef351d5e6013ab6e986f4f4c4a72f Mon Sep 17 00:00:00 2001 From: David <9059044+Tansito@users.noreply.github.com> Date: Wed, 10 Apr 2024 10:39:32 -0400 Subject: [PATCH] Refactorization for run_existing end-point (#1267) * Saving job config * Updated run_existing method * Applying tests to the serializers * Transform config to JSON * Uploaded tests * Fixed code format * Improve swagger documentation * Added logger to the upload end-point * Reorder upload program serializer * Reorder job serializer * Added loos for run_existing * Fix typo * Updated migration script --- .../api/migrations/0019_alter_job_status.py | 29 ++++ gateway/api/models.py | 10 +- gateway/api/serializers.py | 75 +++++++++- gateway/api/services.py | 2 + gateway/api/utils.py | 4 +- gateway/api/v1/serializers.py | 35 +++-- gateway/api/v1/views.py | 29 ++-- gateway/api/views.py | 130 +++++++++++------- gateway/main/settings.py | 5 + gateway/tests/api/test_v1_serializers.py | 98 +++++++++++-- 10 files changed, 322 insertions(+), 95 deletions(-) create mode 100644 gateway/api/migrations/0019_alter_job_status.py diff --git a/gateway/api/migrations/0019_alter_job_status.py b/gateway/api/migrations/0019_alter_job_status.py new file mode 100644 index 000000000..1d7293e9b --- /dev/null +++ b/gateway/api/migrations/0019_alter_job_status.py @@ -0,0 +1,29 @@ +# Generated by Django 4.2.11 on 2024-04-09 20:48 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0018_remove_program_public_delete_catalogentry"), + ] + + operations = [ + migrations.AlterField( + model_name="job", + name="status", + field=models.CharField( + choices=[ + ("PENDING", "Pending"), + ("RUNNING", "Running"), + ("STOPPED", "Stopped"), + ("SUCCEEDED", "Succeeded"), + ("QUEUED", "Queued"), + ("FAILED", "Failed"), + ], + default="QUEUED", + max_length=10, + ), + ), + ] diff --git a/gateway/api/models.py b/gateway/api/models.py index 566fb2335..c4297c2f3 100644 --- a/gateway/api/models.py +++ b/gateway/api/models.py @@ -19,7 +19,7 @@ class JobConfig(models.Model): """Job Configuration model.""" id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) - created = models.DateTimeField(auto_now_add=True) + created = models.DateTimeField(auto_now_add=True, editable=False) auto_scaling = models.BooleanField(default=False, null=True) workers = models.IntegerField( @@ -48,7 +48,7 @@ class JobConfig(models.Model): ) def __str__(self): - return self.id + return f"{self.id}" class Program(ExportModelOperationsMixin("program"), models.Model): @@ -123,7 +123,7 @@ class Job(models.Model): RUNNING_STATES = [RUNNING, PENDING] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) - created = models.DateTimeField(auto_now_add=True) + created = models.DateTimeField(auto_now_add=True, editable=False) updated = models.DateTimeField(auto_now=True, null=True) program = models.ForeignKey(to=Program, on_delete=models.SET_NULL, null=True) @@ -137,7 +137,7 @@ class Job(models.Model): status = models.CharField( max_length=10, choices=JOB_STATUSES, - default=PENDING, + default=QUEUED, ) compute_resource = models.ForeignKey( ComputeResource, on_delete=models.SET_NULL, null=True, blank=True @@ -156,7 +156,7 @@ class Job(models.Model): ) def __str__(self): - return f"" + return f"" def in_terminal_state(self): """Returns true if job is in terminal state.""" diff --git a/gateway/api/serializers.py b/gateway/api/serializers.py index 88bdcdd92..5bc1a9d3f 100644 --- a/gateway/api/serializers.py +++ b/gateway/api/serializers.py @@ -6,10 +6,16 @@ Version serializers inherit from the different serializers. """ +import json +import logging from django.conf import settings from rest_framework import serializers + +from api.utils import build_env_variables, encrypt_env_vars from .models import Program, Job, JobConfig, RuntimeJob +logger = logging.getLogger("gateway.serializers") + class UploadProgramSerializer(serializers.ModelSerializer): """ @@ -30,9 +36,14 @@ def retrieve_one_by_title(self, title, author): ) def create(self, validated_data): + title = validated_data.get("title") + logger.info("Creating program [%s] with UploadProgramSerializer", title) return Program.objects.create(**validated_data) def update(self, instance, validated_data): + logger.info( + "Updating program [%s] with UploadProgramSerializer", instance.title + ) instance.arguments = validated_data.get("arguments", "{}") instance.entrypoint = validated_data.get("entrypoint") instance.dependencies = validated_data.get("dependencies", "[]") @@ -98,11 +109,32 @@ class Meta: model = Job -class ExistingProgramSerializer(serializers.Serializer): - """Serializer for launching existing program.""" +class RunExistingProgramSerializer(serializers.Serializer): + """ + Program serializer for the /run_existing end-point + """ title = serializers.CharField(max_length=255) arguments = serializers.CharField() + config = serializers.CharField() + + def retrieve_one_by_title(self, title, author): + """ + This method returns a Program entry if it finds an entry searching by the title, if not None + """ + return ( + Program.objects.filter(title=title, author=author) + .order_by("-created") + .first() + ) + + def to_representation(self, instance): + """ + Transforms string `config` to JSON + """ + representation = super().to_representation(instance) + representation["config"] = json.loads(representation["config"]) + return representation def update(self, instance, validated_data): pass @@ -111,6 +143,45 @@ def create(self, validated_data): pass +class RunExistingJobSerializer(serializers.ModelSerializer): + """ + Job serializer for the /run_existing end-point + """ + + class Meta: + model = Job + + def create(self, validated_data): + logger.info("Creating Job with RunExistingJobSerializer") + status = Job.QUEUED + program = validated_data.get("program") + arguments = validated_data.get("arguments", "{}") + author = validated_data.get("author") + config = validated_data.get("config", None) + + token = validated_data.pop("token") + carrier = validated_data.pop("carrier") + + job = Job( + status=status, + program=program, + arguments=arguments, + author=author, + config=config, + ) + + env = encrypt_env_vars(build_env_variables(token, job, arguments)) + try: + env["traceparent"] = carrier["traceparent"] + except KeyError: + pass + + job.env_vars = json.dumps(env) + job.save() + + return job + + class RuntimeJobSerializer(serializers.ModelSerializer): """ Serializer for the runtime job model. diff --git a/gateway/api/services.py b/gateway/api/services.py index 170e0e585..989f66c51 100644 --- a/gateway/api/services.py +++ b/gateway/api/services.py @@ -7,6 +7,8 @@ """ # pylint: disable=too-few-public-methods +# pylint: disable=duplicate-code +# Disable duplicate code due to refactorization. This file will be delited. import logging import json diff --git a/gateway/api/utils.py b/gateway/api/utils.py index c53eb8eb4..321cbed48 100644 --- a/gateway/api/utils.py +++ b/gateway/api/utils.py @@ -6,7 +6,7 @@ import re import time import uuid -from typing import Optional, Tuple, Union, Callable, Dict, Any +from typing import Optional, Tuple, Union, Callable, Dict from cryptography.fernet import Fernet from ray.dashboard.modules.job.common import JobStatus @@ -106,7 +106,7 @@ def decrypt_string(string: str) -> str: return fernet.decrypt(string.encode("utf-8")).decode("utf-8") -def build_env_variables(token, job: Job, arguments: Dict[str, Any]) -> Dict[str, str]: +def build_env_variables(token, job: Job, arguments: str) -> Dict[str, str]: """Builds env variables for job. Args: diff --git a/gateway/api/v1/serializers.py b/gateway/api/v1/serializers.py index a81527125..cb2f8b6ec 100644 --- a/gateway/api/v1/serializers.py +++ b/gateway/api/v1/serializers.py @@ -20,12 +20,6 @@ class Meta(serializers.ProgramSerializer.Meta): ] -class ExistingProgramSerializer(serializers.ExistingProgramSerializer): - """ - Existing program serializer first version. This serializer limitates the fields from Program. - """ - - class UploadProgramSerializer(serializers.UploadProgramSerializer): """ UploadProgramSerializer is used by the /upload end-point @@ -41,16 +35,11 @@ class Meta(serializers.UploadProgramSerializer.Meta): ] -class JobSerializer(serializers.JobSerializer): +class RunExistingProgramSerializer(serializers.RunExistingProgramSerializer): """ - Job serializer first version. Include basic fields from the initial model. + RunExistingProgramSerializer is used by the /upload end-point """ - program = ProgramSerializer(many=False) - - class Meta(serializers.JobSerializer.Meta): - fields = ["id", "result", "status", "program", "created"] - class JobConfigSerializer(serializers.JobConfigSerializer): """ @@ -67,6 +56,26 @@ class Meta(serializers.JobConfigSerializer.Meta): ] +class RunExistingJobSerializer(serializers.RunExistingJobSerializer): + """ + RunExistingJobSerializer is used by the /run_existing end-point + """ + + class Meta(serializers.RunExistingJobSerializer.Meta): + fields = ["id", "result", "status", "program", "created", "arguments"] + + +class JobSerializer(serializers.JobSerializer): + """ + Job serializer first version. Include basic fields from the initial model. + """ + + program = ProgramSerializer(many=False) + + class Meta(serializers.JobSerializer.Meta): + fields = ["id", "result", "status", "program", "created"] + + class RuntimeJobSerializer(serializers.RuntimeJobSerializer): """ Runtime job serializer first version. Serializer for the runtime job model. diff --git a/gateway/api/v1/views.py b/gateway/api/v1/views.py index 033c40c26..278c41a93 100644 --- a/gateway/api/v1/views.py +++ b/gateway/api/v1/views.py @@ -36,20 +36,24 @@ def get_service_job_class(): return v1_services.JobService @staticmethod - def get_serializer_job_class(): - return v1_serializers.JobSerializer + def get_serializer_job(*args, **kwargs): + return v1_serializers.JobSerializer(*args, **kwargs) @staticmethod - def get_serializer_existing_program_class(): - return v1_serializers.ExistingProgramSerializer + def get_serializer_job_config(*args, **kwargs): + return v1_serializers.JobConfigSerializer(*args, **kwargs) @staticmethod - def get_serializer_job_config_class(): - return v1_serializers.JobConfigSerializer + def get_serializer_upload_program(*args, **kwargs): + return v1_serializers.UploadProgramSerializer(*args, **kwargs) @staticmethod - def get_serializer_upload_program_class(*args, **kwargs): - return v1_serializers.UploadProgramSerializer(*args, **kwargs) + def get_serializer_run_existing_program(*args, **kwargs): + return v1_serializers.RunExistingProgramSerializer(*args, **kwargs) + + @staticmethod + def get_serializer_run_existing_job(*args, **kwargs): + return v1_serializers.RunExistingJobSerializer(*args, **kwargs) def get_serializer_class(self): return v1_serializers.ProgramSerializer @@ -63,6 +67,15 @@ def get_serializer_class(self): def upload(self, request): return super().upload(request) + @swagger_auto_schema( + operation_description="Run an existing Qiskit Pattern", + request_body=v1_serializers.RunExistingProgramSerializer, + responses={status.HTTP_200_OK: v1_serializers.RunExistingJobSerializer}, + ) + @action(methods=["POST"], detail=False) + def run_existing(self, request): + return super().run_existing(request) + class JobViewSet(views.JobViewSet): # pylint: disable=too-many-ancestors """ diff --git a/gateway/api/views.py b/gateway/api/views.py index 132e5ca49..2b0346181 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -29,13 +29,14 @@ from rest_framework.response import Response from utils import sanitize_file_path -from .exceptions import InternalServerErrorException, ResourceNotFoundException +from .exceptions import InternalServerErrorException from .models import Program, Job, RuntimeJob from .ray import get_job_handler from .serializers import ( JobSerializer, - ExistingProgramSerializer, JobConfigSerializer, + RunExistingJobSerializer, + RunExistingProgramSerializer, UploadProgramSerializer, ) from .services import JobService, ProgramService, JobConfigService @@ -88,36 +89,44 @@ def get_service_job_class(): return JobService @staticmethod - def get_serializer_job_class(): + def get_serializer_job(*args, **kwargs): """ This method returns Job serializer to be used in Program ViewSet. """ - return JobSerializer + return JobSerializer(*args, **kwargs) @staticmethod - def get_serializer_existing_program_class(): + def get_serializer_job_config(*args, **kwargs): """ - This method returns Existign Program serializer to be used in Program ViewSet. + This method returns Job Config serializer to be used in Program ViewSet. """ - return ExistingProgramSerializer + return JobConfigSerializer(*args, **kwargs) @staticmethod - def get_serializer_job_config_class(): + def get_serializer_upload_program(*args, **kwargs): """ - This method returns Job Config serializer to be used in Program ViewSet. + This method returns the program serializer for the upload end-point """ - return JobConfigSerializer + return UploadProgramSerializer(*args, **kwargs) @staticmethod - def get_serializer_upload_program_class(*args, **kwargs): + def get_serializer_run_existing_program(*args, **kwargs): """ - This method returns the program serializer for the upload end-point + This method returns the program serializer for the run_existing end-point """ - return UploadProgramSerializer(*args, **kwargs) + return RunExistingProgramSerializer(*args, **kwargs) + + @staticmethod + def get_serializer_run_existing_job(*args, **kwargs): + """ + This method returns the job serializer for the run_existing end-point + """ + + return RunExistingJobSerializer(*args, **kwargs) def get_serializer_class(self): return self.serializer_class @@ -139,23 +148,33 @@ def upload(self, request): tracer = trace.get_tracer("gateway.tracer") ctx = TraceContextTextMapPropagator().extract(carrier=request.headers) with tracer.start_as_current_span("gateway.program.upload", context=ctx): - serializer = self.get_serializer_upload_program_class(data=request.data) + serializer = self.get_serializer_upload_program(data=request.data) if not serializer.is_valid(): + logger.error( + "UploadProgramSerializer validation failed:\n %s", + serializer.errors, + ) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) title = serializer.validated_data.get("title") author = request.user program = serializer.retrieve_one_by_title(title=title, author=author) if program is not None: - serializer = self.get_serializer_upload_program_class( + logger.info("Program found. [%s] is going to be updated", title) + serializer = self.get_serializer_upload_program( program, data=request.data ) if not serializer.is_valid(): + logger.error( + "UploadProgramSerializer validation failed with program instance:\n %s", + serializer.errors, + ) return Response( serializer.errors, status=status.HTTP_400_BAD_REQUEST ) serializer.save(author=author) + logger.info("Return response with Program [%s]", title) return Response(serializer.data) @action(methods=["POST"], detail=False) @@ -164,58 +183,65 @@ def run_existing(self, request): tracer = trace.get_tracer("gateway.tracer") ctx = TraceContextTextMapPropagator().extract(carrier=request.headers) with tracer.start_as_current_span("gateway.program.run_existing", context=ctx): - serializer = self.get_serializer_existing_program_class()(data=request.data) + serializer = self.get_serializer_run_existing_program(data=request.data) if not serializer.is_valid(): + logger.error( + "RunExistingProgramSerializer validation failed:\n %s", + serializer.errors, + ) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) author = request.user - program = None - program_service = self.get_service_program_class() - try: - title = serializer.data.get("title") - program = program_service.find_one_by_title(title, author) - except ResourceNotFoundException as exception: - return Response(exception, exception.http_code) + title = serializer.data.get("title") + program = serializer.retrieve_one_by_title(title=title, author=author) + if program is None: + logger.error("Qiskit Pattern [%s] was not found.", title) + return Response( + {"message": f"Qiskit Pattern [{title}] was not found."}, + status=status.HTTP_404_NOT_FOUND, + ) jobconfig = None - config_data = request.data.get("config") - if config_data: - config_serializer = self.get_serializer_job_config_class()( - data=json.loads(config_data) - ) - if not config_serializer.is_valid(): - return Response( - config_serializer.errors, status=status.HTTP_400_BAD_REQUEST + config_json = serializer.data.get("config") + if config_json: + logger.info("Configuration for [%s] was found.", title) + job_config_serializer = self.get_serializer_job_config(data=config_json) + if not job_config_serializer.is_valid(): + logger.error( + "JobConfigSerializer validation failed:\n %s", + serializer.errors, ) - try: - jobconfig = ( - self.get_service_job_config_class().save_with_serializer( - config_serializer - ) + return Response( + job_config_serializer.errors, status=status.HTTP_400_BAD_REQUEST ) - except InternalServerErrorException as exception: - return Response(exception, exception.http_code) + jobconfig = job_config_serializer.save() + logger.info("JobConfig [%s] created.", jobconfig.id) - job = None carrier = {} TraceContextTextMapPropagator().inject(carrier) arguments = serializer.data.get("arguments") token = "" if request.auth: token = request.auth.token.decode() - try: - job = self.get_service_job_class().save( - program=program, - arguments=arguments, - author=author, - jobconfig=jobconfig, - token=token, - carrier=carrier, + job_serializer = self.get_serializer_run_existing_job(data={}) + if not job_serializer.is_valid(): + logger.error( + "RunExistingJobSerializer validation failed:\n %s", + serializer.errors, ) - except InternalServerErrorException as exception: - return Response(exception, exception.http_code) + return Response( + job_serializer.errors, status=status.HTTP_400_BAD_REQUEST + ) + job = job_serializer.save( + arguments=arguments, + author=author, + carrier=carrier, + token=token, + program=program, + config=jobconfig, + ) + logger.info("Returning Job [%s] created.", job.id) - job_serializer = self.get_serializer_job_class()(job) return Response(job_serializer.data) @action(methods=["POST"], detail=False) @@ -243,7 +269,7 @@ def run(self, request): jobconfig = None config_data = request.data.get("config") if config_data: - config_serializer = self.get_serializer_job_config_class()( + config_serializer = self.get_serializer_job_config( data=json.loads(config_data) ) if not config_serializer.is_valid(): @@ -276,7 +302,7 @@ def run(self, request): except InternalServerErrorException as exception: return Response(exception, exception.http_code) - job_serializer = self.get_serializer_job_class()(job) + job_serializer = self.get_serializer_job(job) return Response(job_serializer.data) diff --git a/gateway/main/settings.py b/gateway/main/settings.py index cf823a343..2c17a9136 100644 --- a/gateway/main/settings.py +++ b/gateway/main/settings.py @@ -131,6 +131,11 @@ "level": LOG_LEVEL, "propagate": False, }, + "gateway.serializers": { + "handlers": ["console"], + "level": LOG_LEVEL, + "propagate": False, + }, }, } diff --git a/gateway/tests/api/test_v1_serializers.py b/gateway/tests/api/test_v1_serializers.py index ee9f30da2..a1c29a23b 100644 --- a/gateway/tests/api/test_v1_serializers.py +++ b/gateway/tests/api/test_v1_serializers.py @@ -7,7 +7,12 @@ from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files import File from rest_framework.test import APITestCase -from api.v1.serializers import JobConfigSerializer, UploadProgramSerializer +from api.v1.serializers import ( + JobConfigSerializer, + UploadProgramSerializer, + RunExistingProgramSerializer, + RunExistingJobSerializer, +) from api.models import JobConfig, Program @@ -46,9 +51,9 @@ def test_upload_program_serializer_creates_program(self): "resources", "artifact.tar", ) - data = File(open(path_to_resource_artifact, "rb")) + file_data = File(open(path_to_resource_artifact, "rb")) upload_file = SimpleUploadedFile( - "artifact.tar", data.read(), content_type="multipart/form-data" + "artifact.tar", file_data.read(), content_type="multipart/form-data" ) user = models.User.objects.get(username="test_user") @@ -74,6 +79,14 @@ def test_upload_program_serializer_creates_program(self): self.assertEqual(arguments, program.arguments) self.assertEqual(dependencies, program.dependencies) + def test_upload_program_serializer_check_empty_data(self): + data = {} + + serializer = UploadProgramSerializer(data=data) + self.assertFalse(serializer.is_valid()) + errors = serializer.errors + self.assertListEqual(["title", "entrypoint", "artifact"], list(errors.keys())) + def test_upload_program_serializer_fails_at_validation(self): path_to_resource_artifact = os.path.join( os.path.dirname(os.path.abspath(__file__)), @@ -81,23 +94,17 @@ def test_upload_program_serializer_fails_at_validation(self): "resources", "artifact.tar", ) - data = File(open(path_to_resource_artifact, "rb")) + file_data = File(open(path_to_resource_artifact, "rb")) upload_file = SimpleUploadedFile( - "artifact.tar", data.read(), content_type="multipart/form-data" + "artifact.tar", file_data.read(), content_type="multipart/form-data" ) title = "Hello world" entrypoint = "pattern.py" - - data = {} - - serializer = UploadProgramSerializer(data=data) - self.assertFalse(serializer.is_valid()) - errors = serializer.errors - self.assertListEqual(["title", "entrypoint", "artifact"], list(errors.keys())) - arguments = {} dependencies = [] + + data = {} data["title"] = title data["entrypoint"] = entrypoint data["artifact"] = upload_file @@ -108,3 +115,68 @@ def test_upload_program_serializer_fails_at_validation(self): self.assertFalse(serializer.is_valid()) errors = serializer.errors self.assertListEqual(["dependencies", "arguments"], list(errors.keys())) + + def test_run_existing_program_serializer_check_emtpy_data(self): + data = {} + + serializer = RunExistingProgramSerializer(data=data) + self.assertFalse(serializer.is_valid()) + errors = serializer.errors + self.assertListEqual(["title", "arguments", "config"], list(errors.keys())) + + def test_run_existing_program_serializer_fails_at_validation(self): + data = { + "title": "Program", + "arguments": {}, + "config": {}, + } + + serializer = RunExistingProgramSerializer(data=data) + self.assertFalse(serializer.is_valid()) + errors = serializer.errors + self.assertListEqual(["arguments", "config"], list(errors.keys())) + + def test_run_existing_program_serializer_config_json(self): + assert_json = { + "workers": None, + "min_workers": 1, + "max_workers": 5, + "auto_scaling": True, + } + data = { + "title": "Program", + "arguments": "{}", + "config": json.dumps(assert_json), + } + + serializer = RunExistingProgramSerializer(data=data) + self.assertTrue(serializer.is_valid()) + + config = serializer.data.get("config") + self.assertEqual(type(assert_json), type(config)) + self.assertDictEqual(assert_json, config) + + def test_run_existing_job_serializer_check_empty_data(self): + data = {} + + serializer = RunExistingJobSerializer(data=data) + self.assertTrue(serializer.is_valid()) + + def test_run_existing_job_serializer_creates_job(self): + user = models.User.objects.get(username="test_user") + program_instance = Program.objects.get( + id="1a7947f9-6ae8-4e3d-ac1e-e7d608deec82" + ) + arguments = "{}" + + job_serializer = RunExistingJobSerializer(data={}) + job_serializer.is_valid() + job = job_serializer.save( + arguments=arguments, + author=user, + carrier={}, + token="my_token", + program=program_instance, + config=None, + ) + self.assertIsNotNone(job)