-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRestAuthController.py
112 lines (80 loc) · 2.98 KB
/
RestAuthController.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
from cachetools import TTLCache
import requests
import datetime
import time
import ssl
import sys
import os
import urllib.parse
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import Config
import Config as config
class RestAuthController():
target_url = ''
def __init__(self, authcode=None):
self.KEY = config.tool_config['LEARN_REST_KEY']
self.SECRET = config.tool_config['LEARN_REST_SECRET']
if authcode is not None:
self.CREDENTIALS = 'authorization_code'
else:
self.CREDENTIALS = 'client_credentials'
self.PAYLOAD = {
'grant_type': self.CREDENTIALS
}
self.TOKEN = None
self.target_url = config.tool_config['LEARN_REST_URL']
self.app_url = config.tool_config['APP_URL']
self.authcode = authcode
self.EXPIRES_AT = ''
if config.tool_config['VERIFY_CERTS'] == 'True':
self.verify_certs = True
else:
self.verify_certs = False
self.cache = None
self.uuid = None
def getKey(self):
return self.KEY
def getSecret(self):
return self.SECRET
def setToken(self):
try:
if self.cache != None:
token = self.cache['token']
else:
self.requestToken()
except KeyError:
self.requestToken()
def requestToken(self):
OAUTH_URL = f"https://{self.target_url}/learn/api/public/v1/oauth2/token"
if self.CREDENTIALS == 'authorization_code':
OAUTH_URL += f"?code={self.authcode}&redirect_uri={self.app_url}/authcode/"
# Authenticate
print("[auth:setToken] POST Request URL: " + OAUTH_URL)
print("[auth:setToken] JSON Payload: \n" + json.dumps(self.PAYLOAD, indent=4, separators=(',', ': ')))
r = requests.post(OAUTH_URL, data=self.PAYLOAD, auth=(self.KEY, self.SECRET), verify=self.verify_certs)
print("[auth:setToken()] STATUS CODE: " + str(r.status_code))
# strip quotes from result for better dumps
res = json.loads(r.text)
print("[auth:setToken()] RESPONSE: \n" + json.dumps(res, indent=4, separators=(',', ': ')))
if r.status_code == 200:
parsed_json = json.loads(r.text)
self.cache = TTLCache(maxsize=1, ttl=parsed_json['expires_in'])
self.cache['token'] = parsed_json['access_token']
if 'uuid' in parsed_json:
self.uuid = parsed_json['user_id']
print("[auth:setToken()] TOKEN: " + self.getToken())
else:
print("[auth:setToken()] ERROR")
def getToken(self):
try:
token = self.cache['token']
return token
except TypeError:
self.setToken()
return self.cache['token']
except KeyError:
self.setToken()
return self.cache['token']
def getUuid(self):
return (self.uuid)