-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvaronissaas_tools.py
258 lines (201 loc) · 7.22 KB
/
varonissaas_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File: varonissaas_test.py
#
# Copyright (c) Varonis, 2024
#
# This unpublished material is proprietary to Varonis SaaS. All
# rights reserved. The methods and techniques described herein are
# considered trade secrets and/or confidential. Reproduction or
# distribution, in whole or in part, is forbidden except by express
# written permission of Varonis SaaS.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from collections import deque
from datetime import datetime, timezone
from itertools import groupby, islice
from typing import Any, Dict, List, Optional
import dateparser
def try_convert(item, converter, error=None):
"""Try to convert item
:type item: ``Any``
:param item: An item to convert
:type converter: ``Any``
:param converter: Converter function
:type error: ``Any``
:param error: Error object that will be raised in case of error convertion
:return: A converted item or None
:rtype: ``Any``
"""
if item:
try:
return converter(item)
except Exception:
if error:
raise error
raise
return None
def urljoin(url, suffix):
if url[-1:] != "/":
url = url + "/"
if suffix.startswith("/"):
suffix = suffix[1:]
return url + suffix
return url + suffix
def arg_to_datetime(interval: Any, is_utc=True) -> Optional[datetime]:
"""Converts an interval to a datetime
:type interval: ``Any``
:param arg: argument to convert
:type is_utc: ``bool``
:param is_utc: if True then date converted as utc timezone, otherwise will convert with local timezone.
:return:
returns an ``datetime`` if conversion works
returns ``None`` if arg is ``None``
otherwise throws an Exception
:rtype: ``Optional[datetime]``
"""
if interval is None:
return None
if (isinstance(interval, str) and interval.isdigit()) or isinstance(interval, (int, float)):
# timestamp is a str containing digits - we just convert it to int
ms = float(interval)
if ms > 2000000000.0:
# in case timestamp was provided as unix time (in milliseconds)
ms = ms / 1000.0
if is_utc:
return datetime.utcfromtimestamp(ms).replace(tzinfo=timezone.utc)
else:
return datetime.fromtimestamp(ms)
if isinstance(interval, str):
# we use dateparser to handle strings either in ISO8601 format, or
# relative time stamps.
# For example: format 2019-10-23T00:00:00 or "3 days", etc
date = dateparser.parse(interval, settings={"TIMEZONE": "UTC"})
if date is None:
# if d is None it means dateparser failed to parse it
raise ValueError('"{}" is not a valid date'.format(interval))
return date
raise ValueError('"{}" is not a valid date+'.format(interval))
def numeric_to_date(interval: Any, is_utc=True) -> Optional[datetime]:
if isinstance(interval, str) and interval.isdigit():
return arg_to_datetime(f"{interval} day", is_utc)
return arg_to_datetime(interval, is_utc)
def multi_value_to_string_list(arg, separator=","):
"""
Converts a string representation of args to a python list
:type arg: ``str`` or ``list``
:param arg: Args to be converted (required)
:type separator: ``str``
:param separator: A string separator to separate the strings, the default is a comma.
:return: A python list of args
:rtype: ``list``
"""
if not arg:
return []
if isinstance(arg, list):
return arg
if isinstance(arg, str):
return [s.strip() for s in arg.split(separator)]
return [arg]
def strEqual(text1: str, text2: str) -> bool:
if not text1 and not text2:
return True
if not text1 or not text2:
return False
return text1.casefold() == text2.casefold()
def parse_bool(value: str) -> Optional[bool]:
if value:
value = value.lower()
if value == "yes":
return True
if value == "no":
return False
if value == "true":
return True
if value == "false":
return False
if value == "1":
return True
if value == "0":
return False
return None
def parse_bool_list(value: str) -> str:
if value:
parsed = [str(parse_bool(x.strip())) for x in value.split(sep=",")]
return ",".join(parsed)
return None
def convert_json_to_key_value(data: Dict[str, Any]) -> List[Dict[str, Any]]:
result = []
for row in data["rows"]:
obj = {}
for col, val in zip(data["columns"], row):
obj[col] = val
result.append(obj)
return result
def new_construct(obj: Any) -> Any:
if obj is None:
return None
return [] if isinstance(obj, list) else {}
def isliteral(obj: Any):
return isinstance(obj, (int, float, str, bool))
def object_to_dict(obj):
result = new_construct(obj)
queue = deque([(id(obj), obj, result)])
processed = set()
while queue:
obj_id, obj, constructed_obj = queue.pop()
if obj_id in processed:
continue
processed.add(obj_id)
if hasattr(obj, "__dict__"):
obj = vars(obj)
if isinstance(obj, list):
for val in obj:
if isliteral(val):
constructed_obj.append(val)
elif isinstance(val, datetime):
constructed_obj.append(val.strftime("%Y-%m-%dT%H:%M:%S.%f%z"))
else:
new_obj = new_construct(val)
queue.append((id(val), val, new_obj))
constructed_obj.append(new_obj)
elif isinstance(obj, dict):
for key, val in obj.items():
if isliteral(val):
constructed_obj[key] = val
elif isinstance(val, datetime):
constructed_obj[key] = val.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
else:
new_obj = new_construct(val)
constructed_obj[key] = new_obj
queue.append((id(val), val, new_obj))
return result
def convert_level(level: str, levels: List[str]) -> List[str]:
if level is None:
return []
result = levels.copy()
for lev in levels:
if level == lev:
break
result.remove(lev)
return result
def group_by(data: List[Any], key_func: Any) -> Dict[str, List[Any]]:
data = sorted(data, key=key_func)
grouping = groupby(data, key=key_func)
result = {}
for k, g in grouping:
result[k] = list(g)
return result
def batched(iterable, n):
if n < 1:
return []
it = iter(iterable)
while batch := list(islice(it, n)):
yield batch