-
Notifications
You must be signed in to change notification settings - Fork 9
/
api.py
253 lines (214 loc) · 8.12 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
Workflows API endpoints
"""
import logging
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework import serializers, status
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from apps.taskman.service import JiraTaskmanQuerier
from osidb.api_views import RudimentaryUserPathLoggingMixin, get_valid_http_methods
from .exceptions import WorkflowsException
from .helpers import get_flaw_or_404, str2bool
from .serializers import (
ClassificationWorkflowSerializer,
RejectSerializer,
WorkflowSerializer,
)
from .workflow import WorkflowFramework
logger = logging.getLogger(__name__)
class index(RudimentaryUserPathLoggingMixin, APIView):
"""index API endpoint"""
def get(self, request, *args, **kwargs):
"""index API endpoint listing available API endpoints"""
logger.info("getting index")
from .urls import urlpatterns
return Response(
{
"index": [f"/{url.pattern}" for url in urlpatterns],
}
)
# TODO do we need this when Workflows is baked into OSIDB service ?
class healthy(RudimentaryUserPathLoggingMixin, APIView):
"""unauthenticated health check API endpoint"""
permission_classes = [AllowAny]
def get(self, request, *args, **kwargs):
"""
unauthenticated health check API endpoint
"""
logger.info("getting status")
return Response()
class adjust(RudimentaryUserPathLoggingMixin, APIView):
"""workflow adjustion API endpoint"""
http_method_names = get_valid_http_methods(ModelViewSet)
def post(self, request, pk):
"""
workflow adjustion API endpoint
adjust workflow classification of flaw identified by UUID or CVE
and return its workflow:state classification (new if changed and old otherwise)
adjust operation is idempotent so when the classification
is already adjusted running it results in no operation
"""
logger.info(f"adjusting flaw {pk} workflow classification")
flaw = get_flaw_or_404(pk)
flaw.adjust_classification()
return Response(
{
"flaw": flaw.pk,
"classification": flaw.classification,
}
)
class promote(RudimentaryUserPathLoggingMixin, APIView):
"""workflow promote API endpoint"""
http_method_names = get_valid_http_methods(ModelViewSet)
@extend_schema(
parameters=[
OpenApiParameter(
name="Jira-Api-Key",
required=True,
type=str,
location=OpenApiParameter.HEADER,
description="User generated api key for Jira authentication.",
),
OpenApiParameter(
name="Bugzilla-Api-Key",
required=True,
type=str,
location=OpenApiParameter.HEADER,
description="User generated api key for Bugzilla authentication.",
),
]
)
def post(self, request, flaw_id):
"""
workflow promotion API endpoint
try to adjust workflow classification of flaw to the next state available
return its workflow:state classification or errors if not possible to promote
"""
logger.info(f"promoting flaw {flaw_id} workflow classification")
flaw = get_flaw_or_404(flaw_id)
try:
jira_token = request.META.get("HTTP_JIRA_API_KEY")
bz_token = request.META.get("HTTP_BUGZILLA_API_KEY")
if not jira_token:
raise serializers.ValidationError(
{"Jira-Api-Key": "This HTTP header is required."}
)
flaw.promote(jira_token=jira_token, bz_api_key=bz_token)
return Response(
{
"flaw": flaw.pk,
"classification": flaw.classification,
}
)
except WorkflowsException as e:
return Response({"errors": str(e)}, status=status.HTTP_409_CONFLICT)
class reject(RudimentaryUserPathLoggingMixin, APIView):
"""workflow reject API endpoint"""
http_method_names = get_valid_http_methods(ModelViewSet)
@extend_schema(
parameters=[
OpenApiParameter(
name="Jira-Api-Key",
required=True,
type=str,
location=OpenApiParameter.HEADER,
description="User generated api key for Jira authentication.",
),
OpenApiParameter(
name="Bugzilla-Api-Key",
required=True,
type=str,
location=OpenApiParameter.HEADER,
description="User generated api key for Bugzilla authentication.",
),
],
request=RejectSerializer,
)
def post(self, request, flaw_id):
"""
workflow promotion API endpoint
try to reject a flaw / task
"""
serializer = RejectSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
logger.info(f"rejecting flaw {flaw_id} workflow classification")
flaw = get_flaw_or_404(flaw_id)
try:
jira_token = request.META.get("HTTP_JIRA_API_KEY")
bz_token = request.META.get("HTTP_BUGZILLA_API_KEY")
if not jira_token:
raise serializers.ValidationError(
{"Jira-Api-Key": "This HTTP header is required."}
)
flaw.reject(jira_token=jira_token, bz_api_key=bz_token)
JiraTaskmanQuerier(token=jira_token).create_comment(
issue_key=flaw.task_key,
body=request.data["reason"],
)
return Response(
{
"flaw": flaw.pk,
"classification": flaw.classification,
}
)
except WorkflowsException as e:
return Response({"errors": str(e)}, status=status.HTTP_409_CONFLICT)
class classification(RudimentaryUserPathLoggingMixin, APIView):
"""workflow classification API endpoint"""
@extend_schema(
parameters=[
OpenApiParameter(
"verbose",
type={"type": "boolean"},
location=OpenApiParameter.QUERY,
description=(
"Return also workflows with flaw classification "
"which represents the reasoning of the result."
),
),
],
)
def get(self, request, pk):
"""
workflow classification API endpoint
for flaw identified by UUID or CVE returns its workflow:state classification
params:
verbose - return also workflows with flaw classification
which represents the reasoning of the result
"""
logger.info(f"getting flaw {pk} workflow classification")
flaw = get_flaw_or_404(pk)
workflow, state = WorkflowFramework().classify(flaw)
response = {
"flaw": flaw.pk,
"classification": {
"workflow": workflow.name,
"state": state.name,
},
}
# optional verbose classification context
verbose = request.GET.get("verbose")
if verbose is not None:
if str2bool(verbose, "verbose"):
response["workflows"] = ClassificationWorkflowSerializer(
WorkflowFramework().workflows,
context={"flaw": flaw},
many=True,
).data
return Response(response)
class workflows(RudimentaryUserPathLoggingMixin, APIView):
"""workflow info API endpoint"""
def get(self, request, *args, **kwargs):
"""workflow info API endpoint"""
logger.info("getting workflows")
return Response(
{
"workflows": WorkflowSerializer(
WorkflowFramework().workflows,
many=True,
).data,
}
)