-
Notifications
You must be signed in to change notification settings - Fork 9
/
workflow.py
335 lines (268 loc) · 11.3 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""
Workflows Framework
this is the heard of this app implementing the logic over the workflow models
the workflows themselves are defined separately in WORKFLOW_DIR defined in constants
"""
import logging
from os import listdir
from os.path import join
import yaml
from django.db import models
from .constants import WORKFLOW_DIR
from .exceptions import (
LastStateException,
MissingRequirementsException,
MissingStateException,
MissingWorkflowException,
WorkflowDefinitionError,
)
from .helpers import singleton
from .models import Workflow
logger = logging.getLogger(__name__)
@singleton
class WorkflowFramework:
"""
workflow operating framework
loads and provides all available workflows
and implements all workflow operating logic
"""
_workflows = []
@property
def workflows(self):
"""
workflows getter
loads the workflows on the first run
"""
if not self._workflows:
self.load_workflows()
return self._workflows
def load_workflows(self):
"""workflows loader"""
for file in listdir(WORKFLOW_DIR):
if not file.endswith(".yml"):
continue
try:
with open(
file=join(WORKFLOW_DIR, file), mode="r", encoding="utf8"
) as stream:
# create and register workflow instance
logger.info(f"Processing workflow definition: {file}")
self.register_workflow(Workflow(yaml.safe_load(stream)))
except (KeyError, ValueError) as exception:
raise WorkflowDefinitionError(
f"Invalid workflow definition {file}"
) from exception
def register_workflow(self, workflow):
"""
workflow registration
keeps the workflows sorted according to the highest priority
"""
self._workflows.append(workflow)
self._workflows.sort(reverse=True)
def classify(self, instance, state=True):
"""
classify the instance in the proper workflow
and optionally in the proper state too (by default)
the proper workflow is the most prior accepting workflow
the workflows are required to contain a default workflow with empty conditions
so there is always at least one applicable workflow to classify an instance in
returns:
(workflow, state) classification by default
or only workflow classification if requested by state param set to False
"""
for workflow in self.workflows:
if workflow.accepts(instance):
return (workflow, workflow.classify(instance)) if state else workflow
def validate_classification(self, instance, target_workflow, target_state):
"""
This method will evaluate if it is possible to classify the current
instance as a target state
Raise exception if instance lacks requirements for the target state
This method does NOT update the flaw classification
"""
for workflow in self.workflows:
if workflow.name == target_workflow:
not_met_reqs = workflow.validate_classification(instance, target_state)
if len(not_met_reqs) > 0:
error_message = ",".join(not_met_reqs)
error_message = f'Requirements for state "{target_state}" from "{target_workflow}" workflow are missing: [{error_message}].'
raise MissingRequirementsException(error_message)
else:
return
raise MissingStateException(
f"Workflow ({target_workflow}) was not found in WorkflowFramework."
)
def jira_to_state(self, jira_state, jira_resolution):
"""
Given the current Jira state and resolution, find the correponding workflow state
"""
for workflow in self.workflows:
for state in workflow.states:
if (
state.jira_state == jira_state
and state.jira_resolution == jira_resolution
):
return workflow.name, state.name
return None, None
def jira_status(self, instance):
"""
Given a instance, return expected jira status and resolution
"""
for workflow in self.workflows:
if workflow.name == instance.workflow_name:
for state in workflow.states:
if state.name == instance.workflow_state:
return state.jira_state, state.jira_resolution
raise MissingStateException(
f"Combination of workflow ({instance.workflow_name}) and state ({instance.workflow_state}) was not found in WorkflowFramework."
)
class WorkflowModel(models.Model):
"""workflow model base class"""
class WorkflowState(models.TextChoices):
"""allowable workflow states"""
NOVALUE = ""
NEW = "NEW"
TRIAGE = "TRIAGE"
PRE_SECONDARY_ASSESSMENT = "PRE_SECONDARY_ASSESSMENT"
SECONDARY_ASSESSMENT = "SECONDARY_ASSESSMENT"
DONE = "DONE"
REJECTED = "REJECTED"
workflow_name = models.CharField(max_length=50, blank=True, default="DEFAULT")
workflow_state = models.CharField(
choices=WorkflowState.choices,
max_length=24,
blank=True,
default=WorkflowState.NOVALUE,
)
owner = models.CharField(max_length=60, blank=True)
group_key = models.CharField(max_length=60, blank=True)
task_key = models.CharField(max_length=60, blank=True)
task_updated_dt = models.DateTimeField(null=True, blank=True)
team_id = models.CharField(max_length=8, blank=True)
class Meta:
abstract = True
def classify(self):
"""computed workflow classification"""
workflow, state = WorkflowFramework().classify(self)
return {
"workflow": workflow.name,
"state": state.name,
}
@property
def classification(self):
"""stored workflow classification"""
return {
"workflow": self.workflow_name,
"state": self.workflow_state,
}
@classification.setter
def classification(self, classification):
"""
setter for stored workflow classification
may be given by either tuple or dictionary where its values
may be either workflow objects or their names
"""
if isinstance(classification, dict):
workflow = classification["workflow"]
state = classification["state"]
else:
workflow, state = classification
self.workflow_name = workflow if isinstance(workflow, str) else workflow.name
self.workflow_state = state if isinstance(state, str) else state.name
def adjust_classification(self, save=True):
"""
this method will identify and adjust to the higher state the instance can be
this is the automatic way to update state, currently we are adopting a manual
state change, please consider using promote method from this mixin instead
workflow model is by default saved on change which can be turned off by argument
"""
classification = self.classify()
if classification == self.classification:
# no change to be stored
return
self.classification = classification
if not save:
return
self.save()
def validate_classification(self, target_workflow, target_state):
"""
This method will evaluate if it is possible to classify the current
instance as a target state and raise exception if instance lacks
requirements for the target state
This method does NOT update the flaw classification
"""
WorkflowFramework().validate_classification(self, target_workflow, target_state)
def promote(self, save=True, jira_token=None, **kwargs):
"""
this is the cannonical way of changing classification
This method will change instance state to the next available state
Raise exception if instance lacks requirements for the target state
"""
workflow_obj = None
workflows = WorkflowFramework().workflows
for workflow in workflows:
if workflow.name == self.workflow_name:
workflow_obj = workflow
break
if not workflow_obj:
raise MissingWorkflowException(
f"Instance is classified with a non-registered workflow ({self.workflow_name})."
)
state_obj = None
state_count = len(workflow_obj.states)
for i in range(state_count):
if workflow_obj.states[i].name == self.workflow_state:
if i + 1 >= state_count:
raise LastStateException(
f"Instance is already in the last state ({self.workflow_state}) from its workflow ({self.workflow_name})."
)
state_obj = workflow_obj.states[i + 1]
break
if not state_obj:
raise MissingStateException(
f"Instance is classified with a non-registered state ({self.workflow_state})."
)
WorkflowFramework().validate_classification(
self, self.workflow_name, state_obj.name
)
self.workflow_state = state_obj.name
if not save:
return
# keep the save to Jira and BZ separate because BZ can be done asynchronously
# (Jira token is passed directly, but BZ token is in kwargs)
self.save(jira_token=jira_token, raise_validation_error=False)
self.save(raise_validation_error=False, **kwargs)
def reject(self, save=True, jira_token=None, **kwargs):
"""
this is the cannonical way of rejecting a flaw / task
This method will change instance state to rejected if all conditions are met
Raise exception if instance lacks requirements for the rejected state
"""
reject_workflow = "REJECTED"
WorkflowFramework().validate_classification(
self, reject_workflow, WorkflowModel.WorkflowState.REJECTED
)
self.workflow_name = reject_workflow
self.workflow_state = WorkflowModel.WorkflowState.REJECTED
if not save:
return
# keep the save to Jira and BZ separate because BZ can be done asynchronously
# (Jira token is passed directly, but BZ token is in kwargs)
self.save(jira_token=jira_token, raise_validation_error=False)
self.save(raise_validation_error=False, **kwargs)
def jira_status(self):
return WorkflowFramework().jira_status(self)
def adjust_acls(self, save=True):
# a flaw can have internal ACLs before the triage is
# completed or if it was rejected during the triage
internal_supported = [
WorkflowModel.WorkflowState.NEW,
WorkflowModel.WorkflowState.TRIAGE,
WorkflowModel.WorkflowState.REJECTED,
]
if self.workflow_state not in internal_supported and self.is_internal:
self.set_public()
# updates ACLs of all related objects except for snippets
self.set_public_nested()
if save:
self.save()