-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
34adf7d
commit cf5af80
Showing
13 changed files
with
1,395 additions
and
8 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
import requests | ||
import json | ||
import copy | ||
import logging | ||
|
||
from atlas.auth.websso.ssocookies import SSOCookies | ||
from atlas.settings.jiraclient import JIRA_CONFIG | ||
_logger = logging.getLogger('prodtaskwebui') | ||
|
||
|
||
class JIRAClient(object): | ||
def __init__(self, sso_cookies=None): | ||
self.sso_cookies = sso_cookies | ||
|
||
def authorize(self): | ||
try: | ||
self.sso_cookies = SSOCookies( | ||
JIRA_CONFIG['auth_url'], | ||
pem_cert_file_path=JIRA_CONFIG['cert'], | ||
pem_cert_key_path=JIRA_CONFIG['cert_key'] | ||
).get() | ||
return self.sso_cookies | ||
except Exception as ex: | ||
raise Exception('JIRAClient: SSO authentication error: {0}'.format(str(ex))) | ||
|
||
def create_issue(self, summary, description): | ||
if not self.sso_cookies: | ||
raise Exception('JIRAClient: not authorized') | ||
|
||
issue = copy.deepcopy(JIRA_CONFIG['issue_template']) | ||
issue['fields']['summary'] = issue['fields']['summary'] % summary | ||
issue['fields']['description'] = issue['fields']['description'] % description | ||
|
||
headers = {'Content-type': 'application/json'} | ||
|
||
response = requests.post(JIRA_CONFIG['issue_url'], | ||
data=json.dumps(issue), | ||
headers=headers, | ||
cookies=self.sso_cookies, | ||
verify=JIRA_CONFIG['verify_ssl_certificates']) | ||
|
||
if response.status_code != requests.codes.created: | ||
response.raise_for_status() | ||
|
||
result = json.loads(response.content) | ||
|
||
return result['key'] | ||
|
||
def delete_issue(self, issue_key, delete_sub_issues=True): | ||
if not self.sso_cookies: | ||
raise Exception('JIRAClient: not authorized') | ||
|
||
issue_url = '{0}{1}?deleteSubtasks={2}'.format( | ||
JIRA_CONFIG['issue_url'], | ||
issue_key, | ||
str(delete_sub_issues).lower() | ||
) | ||
|
||
response = requests.delete(issue_url, | ||
cookies=self.sso_cookies, | ||
verify=JIRA_CONFIG['verify_ssl_certificates']) | ||
|
||
if response.status_code != requests.codes.no_content: | ||
response.raise_for_status() | ||
|
||
return True | ||
|
||
def create_sub_issue(self, parent_issue_key, summary, description): | ||
if not self.sso_cookies: | ||
raise Exception('JIRAClient: not authorized') | ||
|
||
issue = copy.deepcopy(JIRA_CONFIG['sub_issue_template']) | ||
issue['fields']['summary'] = issue['fields']['summary'] % summary | ||
issue['fields']['description'] = issue['fields']['description'] % description | ||
issue['fields']['parent']['key'] = issue['fields']['parent']['key'] % parent_issue_key | ||
|
||
headers = {'Content-type': 'application/json'} | ||
|
||
response = requests.post(JIRA_CONFIG['issue_url'], | ||
data=json.dumps(issue), | ||
headers=headers, | ||
cookies=self.sso_cookies, | ||
verify=JIRA_CONFIG['verify_ssl_certificates']) | ||
|
||
if response.status_code != requests.codes.created: | ||
response.raise_for_status() | ||
|
||
result = json.loads(response.content) | ||
|
||
return result['key'] | ||
|
||
def log_exception(self, issue_key, exception, log_msg=None): | ||
try: | ||
if not log_msg: | ||
log_msg = '{0}: {1}'.format(type(exception).__name__, str(exception)) | ||
_logger.exception(log_msg) | ||
self.add_issue_comment(issue_key, log_msg) | ||
except Exception as ex: | ||
if _logger: | ||
_logger.exception('log_exception failed: {0}'.format(str(ex))) | ||
|
||
def add_issue_comment(self, issue_key, comment_body): | ||
if not self.sso_cookies: | ||
raise Exception('JIRAClient: not authorized') | ||
|
||
comment = JIRA_CONFIG['issue_comment_template'].copy() | ||
comment['body'] = comment['body'] % comment_body | ||
|
||
headers = {'Content-type': 'application/json'} | ||
comment_url = '{0}{1}/comment'.format(JIRA_CONFIG['issue_url'], issue_key) | ||
|
||
response = requests.post(comment_url, | ||
data=json.dumps(comment), | ||
headers=headers, | ||
cookies=self.sso_cookies, | ||
verify=JIRA_CONFIG['verify_ssl_certificates']) | ||
|
||
if response.status_code != requests.codes.created: | ||
response.raise_for_status() | ||
|
||
return True | ||
|
||
def close_issue(self, issue_key, comment): | ||
if not self.sso_cookies: | ||
raise Exception('JIRAClient: not authorized') | ||
|
||
issue_close_request = copy.deepcopy(JIRA_CONFIG['issue_close_template']) | ||
issue_close_request['update']['comment'][0]['add']['body'] = \ | ||
issue_close_request['update']['comment'][0]['add']['body'] % comment | ||
|
||
headers = {'Content-type': 'application/json'} | ||
transitions_url = '{0}{1}/transitions'.format(JIRA_CONFIG['issue_url'], issue_key) | ||
|
||
response = requests.post(transitions_url, | ||
data=json.dumps(issue_close_request), | ||
headers=headers, | ||
cookies=self.sso_cookies, | ||
verify=JIRA_CONFIG['verify_ssl_certificates']) | ||
|
||
if response.status_code != requests.codes.no_content: | ||
response.raise_for_status() | ||
|
||
return True |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
__version__ = '0.5.5' | ||
|
||
import os.path | ||
import io | ||
import re | ||
import urllib.parse | ||
import pycurl | ||
import html.parser | ||
import base64 | ||
import getpass | ||
|
||
|
||
# noinspection PyUnresolvedReferences, PyBroadException | ||
class SSOCookies(object): | ||
def __init__(self, url, pem_cert_file_path=None, pem_cert_key_path=None, encoding='utf-8'): | ||
self.user_agent_cert = 'curl-sso-certificate/{0} (Mozilla)'.format(__version__) | ||
self.adfs_ep = '/adfs/ls' | ||
self.auth_error = 'HTTP Error 401.2 - Unauthorized' | ||
self.encoding = encoding | ||
|
||
if not pem_cert_file_path or not pem_cert_key_path: | ||
raise Exception('SSOCookies: certificate and/or private key file is not specified') | ||
|
||
if pem_cert_file_path: | ||
if not os.path.isfile(pem_cert_file_path): | ||
raise Exception('SSOCookies: certificate file {0} is not found'.format(pem_cert_file_path)) | ||
if pem_cert_key_path: | ||
if not os.path.isfile(pem_cert_key_path): | ||
raise Exception('SSOCookies: key file {0} is not found'.format(pem_cert_key_path)) | ||
|
||
self.curl = pycurl.Curl() | ||
self.curl.setopt(self.curl.COOKIEFILE, '') | ||
self.curl.setopt(self.curl.USERAGENT, self.user_agent_cert) | ||
self.curl.setopt(self.curl.SSLCERT, pem_cert_file_path) | ||
self.curl.setopt(self.curl.SSLCERTTYPE, 'PEM') | ||
self.curl.setopt(self.curl.SSLKEY, pem_cert_key_path) | ||
self.curl.setopt(self.curl.SSLKEYTYPE, 'PEM') | ||
self.curl.setopt(self.curl.FOLLOWLOCATION, 1) | ||
self.curl.setopt(self.curl.UNRESTRICTED_AUTH, 1) | ||
self.curl.setopt(self.curl.HEADER, 0) | ||
self.curl.setopt(self.curl.SSL_VERIFYPEER, 0) | ||
self.curl.setopt(self.curl.SSL_VERIFYHOST, 0) | ||
self.curl.setopt(self.curl.URL, url) | ||
|
||
_, effective_url = self._request() | ||
|
||
if self.adfs_ep not in effective_url: | ||
raise Exception('SSOCookies: the service does not support CERN SSO') | ||
|
||
self.curl.setopt(self.curl.URL, effective_url) | ||
|
||
response, effective_url = self._request() | ||
|
||
if self.auth_error in response: | ||
raise Exception('SSOCookies: authentication error') | ||
|
||
result = re.search('form .+?action="([^"]+)"', response) | ||
service_provider_url = result.groups()[0] | ||
form_params = re.findall('input type="hidden" name="([^"]+)" value="([^"]+)"', response) | ||
form_params = [(item[0], html.unescape(item[1])) for item in form_params] | ||
|
||
self.curl.setopt(self.curl.URL, service_provider_url) | ||
self.curl.setopt(self.curl.POSTFIELDS, urllib.parse.urlencode(form_params)) | ||
self.curl.setopt(self.curl.POST, 1) | ||
|
||
self._request() | ||
|
||
self.cookie_list = self.curl.getinfo(self.curl.INFO_COOKIELIST) | ||
|
||
def _request(self): | ||
response = io.BytesIO() | ||
self.curl.setopt(self.curl.WRITEFUNCTION, response.write) | ||
self.curl.perform() | ||
response = response.getvalue().decode(self.encoding) | ||
effective_url = self.curl.getinfo(self.curl.EFFECTIVE_URL) | ||
return response, effective_url | ||
|
||
def get(self): | ||
cookies = {} | ||
for item in self.cookie_list: | ||
name = item.split('\t')[5] | ||
value = item.split('\t')[6] | ||
cookies.update({name: value}) | ||
return cookies | ||
|
||
def extract_username(self): | ||
try: | ||
cookies = self.get() | ||
return base64.b64decode(cookies['FedAuth']).split(',')[1].split('\\')[-1] | ||
except Exception: | ||
return getpass.getuser() |
Empty file.
Oops, something went wrong.