-
Notifications
You must be signed in to change notification settings - Fork 1
/
vectra_official.py
executable file
·2477 lines (2234 loc) · 112 KB
/
vectra_official.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import requests
import warnings
import html
import re
import copy
import ipaddress
warnings.filterwarnings('always', '.*', PendingDeprecationWarning)
class HTTPException(Exception):
def __init__(self, response):
"""
Custom exception class to report possible API errors
The body is contructed by extracting the API error code from the requests.Response object
"""
try:
r = response.json()
if 'detail' in r:
detail = r['detail']
elif 'errors' in r:
detail = r['errors'][0]['title']
elif '_meta' in r:
detail = r['_meta']['message']
else:
detail = response.content
except Exception:
detail = response.content
body = 'Status code: {code} - {detail}'.format(code=str(response.status_code), detail=detail)
super().__init__(body)
def request_error_handler(func):
def request_handler(self, *args, **kwargs):
response = func(self, *args, **kwargs)
if response.status_code in [200, 201, 204]:
return response
else:
raise HTTPException(response)
return request_handler
def validate_api_v2(func):
def api_validator(self, *args, **kwargs):
if self.version == 2:
return func(self, *args, **kwargs)
else:
raise NotImplementedError('Method only accessible via v2 of API')
return api_validator
def deprecation(message):
warnings.warn(message, PendingDeprecationWarning)
def param_deprecation(key):
message = '{0} will be deprecated with Vectra API v1 which will be annouced in an upcoming release'.format(key)
warnings.warn(message, PendingDeprecationWarning)
class VectraClient(object):
def __init__(self, url=None, token=None, user=None, password=None, verify=False):
"""
Initialize Vectra client
:param url: IP or hostname of Vectra brain (ex https://www.example.com) - required
:param token: API token for authentication when using API v2*
:param user: Username to authenticate to Vectra brain when using API v1*
:param password: Password when using username to authenticate using API v1*
:param verify: Verify SSL (default: False) - optional
*Either token or user are required
"""
self.url = url
self.version = 2 if token else 1
self.verify = verify
url = VectraClient._remove_trailing_slashes(url)
if token:
self.url = '{url}/api/v2'.format(url=url)
self.headers = {
'Authorization': "Token " + token.strip(),
'Content-Type': "application/json",
'Cache-Control': "no-cache"
}
elif user and password:
self.url = '{url}/api'.format(url=url)
self.auth = (user, password)
deprecation('Deprecation of the Vectra API v1 will be announced in an upcoming release. Migrate to API v2'
' when possible')
else:
raise RuntimeError("At least one form of authentication is required. Please provide a token or username"
" and password")
@staticmethod
def _remove_trailing_slashes(url):
url = url[:-1] if url.endswith('/') else url
return url
@staticmethod
def _generate_campaign_params(args):
"""
Generate query parameters for campaigns based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['fields', 'dst_ip', 'target_domain', 'state', 'name', 'last_updated_gte',
'note_modified_timestamp_gte','page', 'page_size']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid campaign query parameter'.format(str(k)))
return params
@staticmethod
def _generate_host_params(args):
"""
Generate query parameters for hosts based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['active_traffic', 'all', 'c_score', 'c_score_gte', 'certainty', 'certainty_gte',
'fields', 'has_active_traffic', 'include_detection_summaries', 'is_key_asset', 'is_targeting_key_asset',
'key_asset', 'last_detection_timestamp', 'last_source', 'mac_address', 'max_id', 'min_id',
'name', 'note_modified_timestamp_gte', 'ordering','page', 'page_size', 'privilege_category',
'privilege_level', 'privilege_level_gte', 'state', 't_score', 't_score_gte', 'tags',
'targets_key_asset', 'threat', 'threat_gte']
deprecated_keys = ['c_score', 'c_score_gte', 'key_asset', 't_score', 't_score_gte', 'targets_key_asset']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid host query parameter'.format(str(k)))
if k in deprecated_keys: param_deprecation(k)
return params
@staticmethod
def _generate_host_by_id_params(args):
"""
Generate query parameters for host based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['fields', 'include_external', 'include_ldap']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid host query parameter'.format(str(k)))
return params
@staticmethod
def _generate_detection_params(args):
"""
Generate query parameters for detections based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['c_score', 'c_score_gte', 'category', 'certainty', 'certainty_gte', 'description',
'detection', 'detection_category', 'detection_type', 'fields', 'host_id', 'is_targeting_key_asset',
'is_triaged', 'last_timestamp', 'max_id', 'min_id', 'note_modified_timestamp_gte', 'ordering',
'page', 'page_size', 'src_ip', 'state', 't_score', 't_score_gte', 'tags', 'targets_key_asset',
'threat', 'threat_gte']
deprecated_keys = ['c_score', 'c_score_gte', 'category', 'detection', 't_score', 't_score_gte', 'targets_key_asset']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid detection query parameter'.format(str(k)))
if k in deprecated_keys: param_deprecation(k)
return params
@staticmethod
def _generate_group_params(args):
"""
Generate query parameters for groups based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['description', 'domains', 'host_ids', 'host_names', 'last_modified_by',
'last_modified_timestamp', 'name', 'page', 'page_size', 'type']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid group query parameter'.format(str(k)))
return params
@staticmethod
def _generate_rule_params(args):
"""
Generate query parameters for rules based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['contains', 'fields', 'include_templates', 'page', 'page_size', 'ordering']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid rule query parameter'.format(str(k)))
return params
@staticmethod
def _generate_rule_by_id_params(args):
"""
Generate query parameters for rule based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['fields']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid rule query parameter'.format(str(k)))
return params
@staticmethod
def _generate_user_params(args):
"""
Generate query parameters for users based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['username', 'role', 'account_type', 'authentication_profile', 'last_login_gte']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid user query parameter'.format(str(k)))
return params
@staticmethod
def _generate_ip_address_params(args):
"""
Generate query parameters for ip address queries based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['include_ipv4', 'include_ipv6']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid ip address query parameter'.format(str(k)))
return params
@staticmethod
def _generate_subnet_params(args):
"""
Generate query parameters for subnet queries based on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['ordering', 'search']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid subnet query parameter'.format(str(k)))
return params
@staticmethod
def _generate_internal_network_params(args):
"""
Generate query parameters for internal network queries based on provided argsbased on provided args
:param args: dict of keys to generate query params
:rtype: dict
"""
params = {}
valid_keys = ['include_ipv4', 'include_ipv6']
for k, v in args.items():
if k in valid_keys:
if v is not None: params[k] = v
else:
raise ValueError('argument {} is an invalid internal network query parameter'.format(str(k)))
return params
@validate_api_v2
@request_error_handler
def _get_request(self, url, **kwargs):
"""
Do a get request on the provided URL
This is used by paginated endpoints
:rtype: requests.Response
"""
params = {}
for k, v in kwargs.items():
params[k] = v
if self.version == 2:
return requests.get(url, headers=self.headers, params=params, verify=self.verify)
else:
return requests.get(url, auth=self.auth, params=params, verify=self.verify)
@validate_api_v2
@request_error_handler
def get_campaigns(self, **kwargs):
"""
Query all campaigns - all parameters are optional
:param dst_ip: filter on campaign destination IP
:param target_domain: filter on campaign destination domain
:param state: campaign state, possible values are: init, active, closed, closed_never_active
:param name: filter on campaign name
:param last_updated_gte: return only campaigns with a last updated timestamp gte (datetime)
:param note_modified_timestamp_gte: return only campaigns with a last updated timestamp on their note gte (datetime)
:param fields: comma separated string of fields to be filtered and returned
possible values are: id, dst_ip, target_domain, state, name, last_updated,
note, note_modified_by, note_modified_timestamp
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)
"""
return requests.get('{url}/campaigns'.format(url=self.url), headers=self.headers,
params=self._generate_campaign_params(kwargs), verify=self.verify)
def get_all_campaigns(self, **kwargs):
"""
Generator to retrieve all campaigns - all parameters are optional
:param dst_ip: filter on campaign destination IP
:param target_domain: filter on campaign destination domain
:param state: campaign state, possible values are: init, active, closed, closed_never_active
:param name: filter on campaign name
:param last_updated_gte: return only campaigns with a last updated timestamp gte (datetime)
:param note_modified_timestamp_gte: return only campaigns with a last updated timestamp on their note gte (datetime)
:param fields: comma separated string of fields to be filtered and returned
possible values are: id, dst_ip, target_domain, state, name, last_updated,
note, note_modified_by, note_modified_timestamp
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)
"""
resp = requests.get('{url}/campaigns'.format(url=self.url), headers=self.headers,
params=self._generate_campaign_params(kwargs), verify=self.verify)
yield resp
while resp.json()['next']:
resp = self._get_request(url=resp.json()['next'])
yield resp
@validate_api_v2
@request_error_handler
def get_campaign_by_id(self, campaign_id=None, **kwargs):
"""
Get campaign by id
"""
if not campaign_id:
raise ValueError('Campaign id required')
return requests.get('{url}/campaigns/{id}'.format(url=self.url, id=campaign_id),
headers=self.headers, verify=self.verify)
@request_error_handler
def get_hosts(self, **kwargs):
"""
Query all hosts - all parameters are optional
:param all: if set to False, endpoint will only return hosts that have active detections, active traffic or are marked as key assets - default False
:param active_traffic: only return hosts that have seen traffic in the last 2 hours (bool)
:param c_score: certainty score (int) - will be removed with deprecation of v1 of api
:param c_score_gte: certainty score greater than or equal to (int) - will be removed with deprecation of v1 of api
:param certainty: certainty score (int)
:param certainty_gte: certainty score greater than or equal to (int)
:param fields: comma separated string of fields to be filtered and returned
possible values are: id,name,active_traffic,has_active_traffic,t_score,threat,c_score,
certainty,severity,last_source,ip,previous_ips,last_detection_timestamp,key_asset,
is_key_asset,state,targets_key_asset,is_targeting_key_asset,detection_set,
host_artifact_set,sensor,sensor_name,tags,note,note_modified_by,note_modified_timestamp,
url,host_url,last_modified,assigned_to,assigned_date,groups,has_custom_model,privilege_level,
privilege_category,probable_owner,detection_profile
:param has_active_traffic: host has active traffic (bool)
:param include_detection_summaries: include detection summary in response (bool)
:param is_key_asset: host is key asset (bool)
:param is_targeting_key_asset: host is targeting key asset (bool)
:param key_asset: host is key asset (bool) - will be removed with deprecation of v1 of api
:param last_detection_timestamp: timestamp of last detection on this host (datetime)
:param last_source: registered ip addst modified timestamp greater than or equal to (datetime)ress of host
:param mac_address: registered mac address of host
:param max_id: maximum ID of host returned
:param min_id: minimum ID of host returned
:param name: registered name of host
:param note_modified_timestamp_gte: note last modified timestamp greater than or equal to (datetime)
:param ordering: field to use to order response
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)
:param privilege_category: privilege category of host (low/medium/high)
:param privilege_level: privilege level of host (0-10)
:param privilege_level_gte: privilege level of host greater than or equal to (int)
:param state: state of host (active/inactive)
:param t_score: threat score (int) - will be removed with deprecation of v1 of api
:param t_score_gte: threat score greater than or equal to (int) - will be removed with deprection of v1 of api
:param tags: tags assigned to host
:param targets_key_asset: host is targeting key asset (bool)
:param threat: threat score (int)
:param threat_gte: threat score greater than or equal to (int)
"""
if self.version == 2:
return requests.get('{url}/hosts'.format(url=self.url), headers=self.headers,
params=self._generate_host_params(kwargs), verify=self.verify)
else:
return requests.get('{url}/hosts'.format(url=self.url), auth=self.auth,
params=self._generate_host_params(kwargs), verify=self.verify)
def get_all_hosts(self, **kwargs):
"""
Generator to retrieve all hosts - all parameters are optional
:param all: if set to False, endpoint will only return hosts that have active detections, active traffic or are marked as key assets - default False
:param active_traffic: only return hosts that have seen traffic in the last 2 hours (bool)
:param c_score: certainty score (int) - will be removed with deprecation of v1 of api
:param c_score_gte: certainty score greater than or equal to (int) - will be removed with deprecation of v1 of api
:param certainty: certainty score (int)
:param certainty_gte: certainty score greater than or equal to (int)
:param fields: comma separated string of fields to be filtered and returned
possible values are: id,name,active_traffic,has_active_traffic,t_score,threat,c_score,
certainty,severity,last_source,ip,previous_ips,last_detection_timestamp,key_asset,
is_key_asset,state,targets_key_asset,is_targeting_key_asset,detection_set,
host_artifact_set,sensor,sensor_name,tags,note,note_modified_by,note_modified_timestamp,
url,host_url,last_modified,assigned_to,assigned_date,groups,has_custom_model,privilege_level,
privilege_category,probable_owner,detection_profile
:param has_active_traffic: host has active traffic (bool)
:param include_detection_summaries: include detection summary in response (bool)
:param is_key_asset: host is key asset (bool)
:param is_targeting_key_asset: host is targeting key asset (bool)
:param key_asset: host is key asset (bool) - will be removed with deprecation of v1 of api
:param last_detection_timestamp: timestamp of last detection on this host (datetime)
:param last_source: registered ip addst modified timestamp greater than or equal to (datetime)ress of host
:param mac_address: registered mac address of host
:param max_id: maximum ID of host returned
:param min_id: minimum ID of host returned
:param name: registered name of host
:param note_modified_timestamp_gte: note last modified timestamp greater than or equal to (datetime)
:param ordering: field to use to order response
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)
:param privilege_category: privilege category of host (low/medium/high)
:param privilege_level: privilege level of host (0-10)
:param privilege_level_gte: privilege level of host greater than or equal to (int)
:param state: state of host (active/inactive)
:param t_score: threat score (int) - will be removed with deprecation of v1 of api
:param t_score_gte: threat score greater than or equal to (int) - will be removed with deprection of v1 of api
:param tags: tags assigned to host
:param targets_key_asset: host is targeting key asset (bool)
:param threat: threat score (int)
:param threat_gte: threat score greater than or equal to (int)
"""
resp = requests.get('{url}/hosts'.format(url=self.url), headers=self.headers,
params=self._generate_host_params(kwargs), verify=self.verify)
yield resp
while resp.json()['next']:
resp = self._get_request(url=resp.json()['next'])
yield resp
@request_error_handler
def get_host_by_id(self, host_id=None, **kwargs):
"""
Get host by id
:param host_id: host id - required
:param include_external: include fields regarding external connectors (e.g. CrowdStrike) - optional
:param include_ldap: include LDAP context pulled over AD connector - optional
:param fields: comma separated string of fields to be filtered and returned - optional
possible values are: active_traffic, assigned_date, assigned_to, c_score, campaign_summaries,
carbon_black, certainty, crowdstrike, detection_profile, detection_set, detection_summaries,
groups, has_active_traffic, has_custom_model, has_shell_knocker_learnings, host_artifact_set,
host_luid, host_session_luid, host_url, id, ip, is_key_asset, is_targeting_key_asset, key_asset,
last_detection_timestamp, last_modified, last_seen, last_source, ldap, name, note, note_modified_by,
note_modified_timestamp, previous_ips, privilege_category, privilege_level, probable_owner, sensor,
sensor_name, severity, shell_knocker, state, suspicious_admin_learnings, t_score, tags, targets_key_asset,
threat, url, vcenter
"""
if not host_id:
raise ValueError('Host id required')
if self.version == 2:
return requests.get('{url}/hosts/{id}'.format(url=self.url, id=host_id), headers=self.headers,
params=self._generate_host_by_id_params(kwargs), verify=self.verify)
else:
return requests.get('{url}/hosts/{id}'.format(url=self.url, id=host_id), auth=self.auth,
params=self._generate_host_by_id_params(kwargs), verify=self.verify)
@validate_api_v2
@request_error_handler
def set_key_asset(self, host_id=None, set=True):
"""
(Un)set host as key asset
:param host_id: id of host needing to be set - required
:param set: set flag to true if setting host as key asset
"""
if not host_id:
raise ValueError('Host id required')
if set:
payload = {'key_asset':'true'}
else:
payload = {'key_asset':'false'}
return requests.patch('{url}/hosts/{id}'.format(url=self.url, id=host_id), headers=self.headers, json=payload,
verify=self.verify)
@validate_api_v2
@request_error_handler
def get_host_tags(self, host_id=None):
"""
Get host tags
:param host_id: ID of the host for which to retrieve the tags
"""
if not host_id:
raise ValueError('Host id required')
return requests.get('{url}/tagging/host/{id}'.format(url=self.url, id=host_id), headers=self.headers,
verify=False)
@validate_api_v2
@request_error_handler
def set_host_tags(self, host_id=None, tags=[], append=False):
"""
Set host tags
:param host_id:
:param tags: list of tags to add to host
:param append: overwrites existing list if set to False, appends to existing tags if set to True
Set to empty list to clear tags (default: False)
"""
if not host_id:
raise ValueError('Host id required')
if append and type(tags) == list:
current_list = self.get_host_tags(host_id=host_id).json()['tags']
payload = {
"tags": current_list + tags
}
elif type(tags) == list:
payload = {
"tags": tags
}
else:
raise TypeError('tags must be of type list')
return requests.patch('{url}/tagging/host/{id}'.format(url=self.url, id=host_id), headers=self.headers,
json=payload, verify=self.verify)
@validate_api_v2
@request_error_handler
def bulk_set_hosts_tag(self, tag, host_ids):
"""
Set a tag in bulk on multiple hosts. Only one tag can be set at a time
:param host_ids: IDs of the hosts for which to set the tag
"""
if not isinstance(host_ids, list):
raise TypeError('Host IDs must be of type list')
payload = {
'objectIds': host_ids,
'tag': tag
}
return requests.post('{url}/tagging/host'.format(url=self.url), headers=self.headers, json=payload,
verify=False)
@validate_api_v2
@request_error_handler
def bulk_delete_hosts_tag(self, tag, host_ids):
"""
Delete a tag in bulk on multiple hosts. Only one tag can be deleted at a time
:param host_ids: IDs of the hosts on which to delete the tag
"""
if not isinstance(host_ids, list):
raise TypeError('Host IDs must be of type list')
payload = {
'objectIds': host_ids,
'tag': tag
}
return requests.delete('{url}/tagging/host'.format(url=self.url), headers=self.headers, json=payload,
verify=False)
@validate_api_v2
@request_error_handler
def get_host_note(self, host_id=None):
"""
Get host notes
:param host_id:
For consistency we return a requests.models.Response object
As we do not want to return the complete host body, we alter the response content
"""
if not host_id:
raise ValueError('Host id required')
host = requests.get('{url}/hosts/{id}'.format(url=self.url, id=host_id), headers=self.headers, verify=self.verify)
if host.status_code == 200:
host_note = host.json()['note']
# API endpoint return HTML escaped characters
host_note = html.unescape(host_note) if host_note else ''
json_dict = {'status': 'success', 'host_id': str(host_id), 'note': host_note}
host._content = json.dumps(json_dict).encode('utf-8')
return host
@validate_api_v2
@request_error_handler
def set_host_note(self, host_id=None, note='', append=False):
"""
Set host note
:param host_id:
:param note: content of the note to set
:param append: overwrites existing note if set to False, appends if set to True
Set to empty note string to clear host note
"""
if not host_id:
raise ValueError('Host id required')
if append and isinstance(note, str):
current_note = self.get_host_note(host_id=host_id).json()['note']
if current_note:
if len(note) > 0:
payload = {
"note": '{}{}{}'.format(current_note, '\n', note)
}
else:
payload = {
"note": current_note
}
else:
payload = {
"note": note
}
elif isinstance(note, str):
payload = {
"note": note
}
else:
raise TypeError('Note must be of type str')
return requests.patch('{url}/hosts/{id}'.format(url=self.url, id=host_id), headers=self.headers, data=json.dumps(payload),
verify=self.verify)
@request_error_handler
def get_detections(self, **kwargs):
"""
Query all detections - all parameters are optional
:param c_score: certainty score (int) - will be removed with deprecation of v1 of api
:param c_score_gte: certainty score greater than or equal to (int) - will be removed with deprecation of v1 of api
:param category: detection category - will be removed with deprecation of v1 of api
:param certainty: certainty score (int)
:param certainty_gte: certainty score greater than or equal to (int)
:param detection: detection type
:param detection_type: detection type
:param detection_category: detection category
:param description:
:param fields: comma separated string of fields to be filtered and returned
possible values are: id, url, detection_url, category, detection, detection_category,
detection_type, custom_detection, description, src_ip, state, t_score, c_score,
certainty, threat, first_timestamp, last_timestamp, targets_key_asset,
is_targeting_key_asset, src_account, src_host, note, note_modified_by,
note_modified_timestamp, sensor, sensor_name, tags, triage_rule_id, assigned_to,
assigned_date, groups, is_marked_custom, is_custom_model
:param host_id: host id (int)
:param is_targeting_key_asset: detection is targeting key asset (bool)
:param is_triaged: detection is triaged
:param last_timestamp: timestamp of last activity on detection (datetime)
:param max_id: maximum ID of detection returned
:param min_id: minimum ID of detection returned
:param ordering: field used to sort response
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)
:param src_ip: source ip address of host attributed to detection
:param state: state of detection (active/inactive)
:param t_score: threat score (int) - will be removed with deprecation of v1 of api
:param t_score_gte: threat score is greater than or equal to (int) - will be removed with deprecation of v1 of api
:param tags: tags assigned to detections; this uses substring matching
:param targets_key_asset: detection targets key asset (bool) - will be removed with deprecation of v1 of api
:param threat: threat score (int)
:param threat_gte threat score is greater than or equal to (int)
:param note_modified_timestamp_gte: note last modified timestamp greater than or equal to (datetime)
"""
if self.version == 2:
return requests.get('{url}/detections'.format(url=self.url), headers=self.headers,
params=self._generate_detection_params(kwargs), verify=self.verify)
else:
return requests.get('{url}/detections'.format(url=self.url), auth=self.auth,
params=self._generate_detection_params(kwargs), verify=self.verify)
def get_all_detections(self, **kwargs):
"""
Generator to retrieve all detections - all parameters are optional
:param c_score: certainty score (int) - will be removed with deprecation of v1 of api
:param c_score_gte: certainty score greater than or equal to (int) - will be removed with deprecation of v1 of api
:param category: detection category - will be removed with deprecation of v1 of api
:param certainty: certainty score (int)
:param certainty_gte: certainty score greater than or equal to (int)
:param detection: detection type
:param detection_type: detection type
:param detection_category: detection category
:param description:
:param fields: comma separated string of fields to be filtered and returned
possible values are: id, url, detection_url, category, detection, detection_category,
detection_type, custom_detection, description, src_ip, state, t_score, c_score,
certainty, threat, first_timestamp, last_timestamp, targets_key_asset,
is_targeting_key_asset, src_account, src_host, note, note_modified_by,
note_modified_timestamp, sensor, sensor_name, tags, triage_rule_id, assigned_to,
assigned_date, groups, is_marked_custom, is_custom_model
:param host_id: detection id (int)
:param is_targeting_key_asset: detection is targeting key asset (bool)
:param is_triaged: detection is triaged
:param last_timestamp: timestamp of last activity on detection (datetime)
:param max_id: maximum ID of detection returned
:param min_id: minimum ID of detection returned
:param ordering: field used to sort response
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)
:param src_ip: source ip address of host attributed to detection
:param state: state of detection (active/inactive)
:param t_score: threat score (int) - will be removed with deprecation of v1 of api
:param t_score_gte: threat score is greater than or equal to (int) - will be removed with deprecation of v1 of api
:param tags: tags assigned to detection; this uses substring matching
:param targets_key_asset: detection targets key asset (bool) - will be removed with deprecation of v1 of api
:param threat: threat score (int)
:param threat_gte threat score is greater than or equal to (int)
:param note_modified_timestamp_gte: note last modified timestamp greater than or equal to (datetime)
"""
resp = requests.get('{url}/detections'.format(url=self.url), headers=self.headers,
params=self._generate_detection_params(kwargs), verify=self.verify)
yield resp
while resp.json()['next']:
resp = self._get_request(url = resp.json()['next'])
yield resp
@request_error_handler
def get_detection_by_id(self, detection_id=None, **kwargs):
"""
Get detection by id
:param detection_id: detection id - required
:param fields: comma separated string of fields to be filtered and returned - optional
possible values are: id, url, detection_url, category, detection, detection_category,
detection_type, custom_detection, description, src_ip, state, t_score, c_score,
certainty, threat, first_timestamp, last_timestamp, targets_key_asset,
is_targeting_key_asset, src_account, src_host, note, note_modified_by,
note_modified_timestamp, sensor, sensor_name, tags, triage_rule_id, assigned_to,
assigned_date, groups, is_marked_custom, is_custom_model
"""
if not detection_id:
raise ValueError('Detection id required')
if self.version == 2:
return requests.get('{url}/detections/{id}'.format(url=self.url, id=detection_id), headers=self.headers,
params=self._generate_detection_params(kwargs), verify=self.verify)
else:
return requests.get('{url}/detections/{id}'.format(url=self.url, id=detection_id), auth=self.auth,
params=self._generate_detection_params(kwargs), verify=self.verify)
@validate_api_v2
@request_error_handler
def mark_detections_fixed(self, detection_ids=None):
"""
Mark detections as fixed
:param detection_ids: list of detections to mark as fixed
"""
if not isinstance(detection_ids, list):
raise ValueError('Must provide a list of detection IDs to mark as fixed')
return self._toggle_detections_fixed(detection_ids, fixed=True)
@validate_api_v2
@request_error_handler
def unmark_detections_fixed(self, detection_ids=None):
"""
Unmark detections as fixed
:param detection_ids: list of detections to unmark as fixed
"""
if not isinstance(detection_ids, list):
raise ValueError('Must provide a list of detection IDs to unmark as fixed')
return self._toggle_detections_fixed(detection_ids, fixed=False)
def _toggle_detections_fixed(self, detection_ids, fixed):
"""
Internal function to mark/unmark detections as fixed
"""
payload = {
'detectionIdList': detection_ids,
'mark_as_fixed': str(fixed)
}
return requests.patch('{url}/detections'.format(url=self.url), json=payload, headers=self.headers,
verify=self.verify)
@validate_api_v2
@request_error_handler
def mark_detections_custom(self, detection_ids=[], triage_category=None):
"""
Mark detections as custom
:param detection_ids: list of detection IDs to mark as custom
:param triage_category: custom name to give detection
:rtype: requests.Response
"""
if not isinstance(detection_ids, list):
raise ValueError('Must provide a list of detection IDs to mark as custom')
payload = {
"triage_category": triage_category,
"detectionIdList": detection_ids
}
return requests.post('{url}/rules'.format(url=self.url), headers=self.headers, json=payload,
verify=self.verify)
@validate_api_v2
@request_error_handler
def unmark_detections_custom(self, detection_ids=[]):
"""
Unmark detection as custom
:param detection_ids: list of detection IDs to unmark as custom
:rtype: requests.Response
"""
if not isinstance(detection_ids, list):
raise ValueError('Must provide a list of detection IDs to unmark as custom')
payload = {
"detectionIdList": detection_ids
}
response = requests.delete('{url}/rules'.format(url=self.url), headers=self.headers, json=payload,
verify=self.verify)
# DELETE returns an empty response, but we populate the response for consistency with the mark_as_fixed() function
json_dict = {'_meta': {'message': 'Successfully unmarked detections', 'level': 'Success'}}
response._content = json.dumps(json_dict).encode('utf-8')
return response
@validate_api_v2
@request_error_handler
def get_detection_tags(self, detection_id=None):
"""
Get detection tags
:param detection_id:
"""
return requests.get('{url}/tagging/detection/{id}'.format(url=self.url, id=detection_id), headers=self.headers,
verify=False)
@validate_api_v2
@request_error_handler
def set_detection_tags(self, detection_id=None, tags=[], append=False):
"""
Set detection tags
:param detection_id:
:param tags: list of tags to add to detection
:param append: overwrites existing list if set to False, appends to existing tags if set to True
Set to empty list to clear all tags (default: False)
"""
if append and type(tags) == list:
current_list = self.get_detection_tags(detection_id=detection_id).json()['tags']
payload = {
"tags": current_list + tags
}
elif type(tags) == list:
payload = {
"tags": tags
}
else:
raise TypeError('tags must be of type list')
return requests.patch('{url}/tagging/detection/{id}'.format(url=self.url, id=detection_id), headers=self.headers,
json=payload, verify=self.verify)
@validate_api_v2
@request_error_handler
def bulk_set_detections_tag(self, tag, detection_ids):
"""
Set a tag in bulk on multiple detections. Only one tag can be set at a time
:param detection_ids: IDs of the detections for which to set the tag
"""
if not isinstance(detection_ids, list):
raise TypeError('Detection IDs must be of type list')
payload = {
'objectIds': detection_ids,
'tag': tag
}
return requests.post('{url}/tagging/detection'.format(url=self.url), headers=self.headers, json=payload,
verify=False)
@validate_api_v2
@request_error_handler
def bulk_delete_detections_tag(self, tag, detection_ids):
"""
Delete a tag in bulk on multiple detections. Only one tag can be deleted at a time
:param detection_ids: IDs of the detections for which to delete the tag
"""
if not isinstance(detection_ids, list):
raise TypeError('Detection IDs must be of type list')
payload = {
'objectIds': detection_ids,
'tag': tag
}
return requests.delete('{url}/tagging/detection'.format(url=self.url), headers=self.headers, json=payload,
verify=False)
@validate_api_v2
@request_error_handler
def get_detection_note(self, detection_id=None):
"""
Get detection notes
:param detection_id:
For consistency we return a requests.models.Response object
As we do not want to return the complete detection body, we alter the response content
"""
detection = requests.get('{url}/detections/{id}'.format(url=self.url, id=detection_id), headers=self.headers, verify=self.verify)
if detection.status_code == 200:
detection_note = detection.json()['note']
# API endpoint return HTML escaped characters
detection_note = html.unescape(detection_note) if detection_note else ''
json_dict = {'status': 'success', 'detection_id': str(detection_id), 'note': detection_note}
detection._content = json.dumps(json_dict).encode('utf-8')
return detection
@validate_api_v2
@request_error_handler
def set_detection_note(self, detection_id=None, note='', append=False):
"""
Set detection note
:param detection_id:
:param note: content of the note to set
:param append: overwrites existing note if set to False, appends if set to True
Set to empty note string to clear detection note
"""
if append and isinstance(note, str):
current_note = self.get_detection_note(detection_id=detection_id).json()['note']
if current_note:
if len(note) > 0:
payload = {
"note": '{}{}{}'.format(current_note, '\n', note)
}
else:
payload = {
"note": current_note
}
else:
payload = {
"note": note
}
elif isinstance(note, str):
payload = {
"note": note
}
else:
raise TypeError('Note must be of type str')
return requests.patch('{url}/detections/{id}'.format(url=self.url, id=detection_id), headers=self.headers, json=payload,
verify=self.verify)
@validate_api_v2
def get_detection_pcap(self, detection_id=None, filename=None):
"""
Get detection pcap
:param detection_id: ID of the detection for which to get a pcap
:param filename: filename to write the pcap to. Will be overwriten if already exists.
"""
response = requests.get('{url}/detections/{id}/pcap'.format(url=self.url, id=detection_id), headers=self.headers,
verify=False)
if response.status_code not in [200, 201, 204]:
raise HTTPException(response)
with open(filename, 'wb') as f:
f.write(response.content)
# Return a <Response> object for consistency
json_dict = {'status': 'success', 'detection_id': str(detection_id), 'file_created': filename}
response._content = json.dumps(json_dict).encode('utf-8')
return response
# TODO add request_error_handler decorator as soon as get_rules_by_name() returns requests.Response object
@validate_api_v2
def get_rules(self, name=None, rule_id=None, **kwargs):
"""
Query all rules
:param name: name of rule to search (substring matching)
:param rule_id: ID of rule to return
:param contains:
:param fields: comma separated string of fields to be filtered and returned
possible values are: active_detections, all_hosts, category, created_timestamp, description,
enabled, flex1, flex2, flex3, flex4, flex5, flex6, host, host_group, id, identity, ip,
ip_group, is_whitelist, last_timestamp, priority, remote1_dns, remote1_dns_groups,
remote1_ip, remote1_ip_groups, remote1_kerb_account, remote1_kerb_service, remote1_port,
remote1_proto, remote2_dns, remote2_dns_groups, remote2_ip, remote2_ip_groups, remote2_port,
remote2_proto, sensor_luid, smart_category, template, total_detections, type_vname, url
:param include_templates: include rule templates, default is False
:param ordering: field used to sort response
:param page: page number to return (int)
:param page_size: number of object to return in repsonse (int)