-
Notifications
You must be signed in to change notification settings - Fork 5
/
auth.py
165 lines (136 loc) · 7.7 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
Copyright (C) 2016, Blackboard Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
Neither the name of Blackboard Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY BLACKBOARD INC ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL BLACKBOARD INC. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import json
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
import datetime
import time
#from datetime import now
import ssl
import sys
#Tls1Adapter allows for connection to sites with non-CA/self-signed
# certificates e.g.: Learn Dev VM
class Tls1Adapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
ssl_version=ssl.PROTOCOL_TLSv1)
class AuthToken():
target_url = ''
def __init__(self, URL):
self.SECRET = "biExBJNI1IXiBBXpV8g01JJJjmXKHSg7" #Example Only. Change to your secret
self.KEY = "9cb9384a-3662-410d-9953-fe73cc374b81"#Example Only. Change to your key
self.CREDENTIALS = 'client_credentials'
self.PAYLOAD = {
'grant_type':'client_credentials'
}
self.TOKEN = None
self.target_url = URL
self.EXPIRES_AT = ''
def getKey(self):
return self.KEY
def getSecret(self):
return self.SECRET
def setToken(self):
oauth_path = '/learn/api/public/v1/oauth2/token'
OAUTH_URL = 'https://' + self.target_url + oauth_path
if self.TOKEN is None:
#session = requests.session()
#session.mount('https://', Tls1Adapter()) # remove for production
# Authenticate
#print("[auth:setToken] POST Request URL: " + OAUTH_URL)
#print("[auth:setToken] JSON Payload: \n" + json.dumps(self.PAYLOAD, indent=4, separators=(',', ': ')))
try:
r = requests.post(OAUTH_URL, data=self.PAYLOAD, auth=(self.KEY, self.SECRET), verify=False)
except requests.HTTPError as err:
if err.code == 400: # 400 Invalid access token request.
print("[auth:setToken] Error 400 Invalid access token request.\n")
elif err.code == 401: # 401 Invalid client credentials, or no access granted to this Learn server.
print("[auth:setToken] Error 404 Invalid client credentials, or no access granted to this Learn server.\n")
sys.exit(2)
elif err.code == 429:
print("[auth:setToken] Error 429 Too Many Requests.")
elif err.code == LocationParseError:
print("[auth:setToken] Error LocationParseError\n")
sys.exit(2)
except requests.RequestException:
print("[auth:setToken] Error cannot connect to requested server. Exiting.\n")
sys.exit(2)
#print("[auth:setToken()] STATUS CODE: " + str(r.status_code) )
#strip quotes from result for better dumps
res = json.loads(r.text)
#print("[auth:setToken()] RESPONSE: \n" + json.dumps(res,indent=4, separators=(',', ': ')))
if r.status_code == 200:
parsed_json = json.loads(r.text)
self.TOKEN = parsed_json['access_token']
self.EXPIRES = parsed_json['expires_in']
m, s = divmod(self.EXPIRES, 60)
#h, m = divmod(m, 60)
#print "%d:%02d:%02d" % (h, m, s)
self.NOW = datetime.datetime.now()
self.EXPIRES_AT = self.NOW + datetime.timedelta(seconds = s, minutes = m)
#print ("[auth:setToken()] Token Expires at " + self.EXPIRES_AT.strftime("%H:%M:%S"))
#print ("[auth:setToken()] TOKEN: " + self.TOKEN)
#there is the possibility the reaquired token may expire
#before we are done so perform expiration sanity check...
if self.isExpired(self.EXPIRES_AT):
self.setToken()
else:
print("[auth:setToken()] ERROR")
#else:
#print ("[auth:setToken()] TOKEN set")
def getToken(self):
#if token time is less than a one second then
# print that we are pausing to clear
# re-auth and return the new token
if self.isExpired(self.EXPIRES_AT):
self.setToken()
return self.TOKEN
def revokeToken(self):
revoke_path = '/learn/api/public/v1/oauth2/revoke'
revoke_URL = 'https://' + self.target_url + revoke_path
print("[auth:revokeToken()] KEY: " + self.KEY)
print("[auth:revokeToken()] SECRET: " + self.SECRET)
print("[auth:revokeToken()] TOKEN: " + self.TOKEN)
print("[auth:revokeToken()] revoke_URL: " + revoke_URL)
self.PAYLOAD = {
'token':self.TOKEN
}
if self.TOKEN != '':
print("[auth:revokeToken()] TOKEN not empty...able to revoke")
print("[auth:revokeToken()] POST PAYLOAD: ")
for keys,values in self.PAYLOAD.items():
print("\t\t\t" + keys + ":" + values)
#session = requests.session()
#session.mount('https://', Tls1Adapter()) # remove for production
# revoke token
print("[auth:revokeToken] Request URL: " + revoke_URL)
print("[auth:revokeToken] JSON Payload: \n " + json.dumps(self.PAYLOAD, indent=4, separators=(',', ': ')))
r = requests.post(revoke_URL, data=self.PAYLOAD, auth=(self.KEY, self.SECRET), verify=False)
print("[auth:revokeToken()] STATUS CODE: " + str(r.status_code) )
print("[auth:revokeToken()] RESPONSE: " + r.text)
if r.status_code == 200:
print("[auth:revokeToken()] Token Revoked")
else:
print("[auth] ERROR on token revoke")
else:
print ("[auth:revokeToken()] Must have set a token to revoke a token...")
def isExpired(self, expiration_datetime):
expired = False
#print ("[auth:isExpired()] Token Expires at " + expiration_datetime.strftime("%H:%M:%S"))
time_left = (expiration_datetime - datetime.datetime.now()).total_seconds()
#print ("[auth:isExpired()] Time Left on Token (in seconds): " + str(time_left))
if time_left < 1:
#print ("[auth:isExpired()] Token almost expired retrieving new token in two seconds.")
time.sleep( 1 )
expired = True
return expired