-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathOneflowSDK.py
60 lines (53 loc) · 2.06 KB
/
OneflowSDK.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/python
__author__ = "HPInc."
import requests, hmac, hashlib, datetime
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from urllib.parse import unquote
class OneflowSDK:
def __init__(self, url, token, secret, options=None):
self.url = url
self.token = token
self.secret = secret
self.options = options
def requests_retry_session(self):
retries = getattr(self.options, "retries", 3)
retryFactor = getattr(self.options, "retryFactor", 0.3 * 60)
retryStatus = getattr(self.options, "retryStatus", [429])
retryMethods = getattr(
self.options,
"retryMethods",
["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"],
)
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=retryFactor,
status_forcelist=retryStatus,
allowed_methods=retryMethods,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def create_headers(self, method, path):
timestamp = datetime.datetime.now().isoformat()
string_to_sign = method.upper() + " " + unquote(path) + " " + timestamp
signature = hmac.new(
bytes(self.secret, "utf-8"), bytes(string_to_sign, "utf-8"), hashlib.sha256
).hexdigest()
oneflow_auth = self.token + ":" + signature
return {
"content-type": "application/json",
"x-oneflow-algorithm": "SHA256",
"x-oneflow-authorization": oneflow_auth,
"x-oneflow-date": timestamp,
}
def request(self, method, path, data=None, params=None):
url = self.url + path
headers = self.create_headers(method, path)
session = self.requests_retry_session()
result = session.request(method, url, data=data, params=params, headers=headers)
return result.content