Skip to content

Commit

Permalink
introducing usage of the new crypto to handle certificates
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomasz Wolniewicz committed Nov 5, 2024
1 parent 9513075 commit 4bec3b4
Showing 1 changed file with 88 additions and 65 deletions.
153 changes: 88 additions & 65 deletions devices/linux/Files/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
from typing import List, Type, Union

NM_AVAILABLE = True
CRYPTO_AVAILABLE = True
NEW_CRYPTO_AVAILABLE = True
OPENSSL_CRYPTO_AVAILABLE = False
DEBUG_ON = False

parser = argparse.ArgumentParser(description='eduroam linux installer.')
Expand Down Expand Up @@ -81,8 +82,7 @@ def debug(msg) -> None:
"""Print debugging messages to stdout"""
if not DEBUG_ON:
return
else:
print("DEBUG:" + str(msg))
print("DEBUG:" + str(msg))


def byte_to_string(barray: List) -> str:
Expand All @@ -101,10 +101,17 @@ def byte_to_string(barray: List) -> str:


try:
from OpenSSL import crypto
crypto.load_pkcs12 # missing in newer versions
except (ImportError, AttributeError): # AttributeError sometimes thrown by old/broken OpenSSL versions
CRYPTO_AVAILABLE = False
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
except ImportError:
NEW_CRYPTO_AVAILABLE = False
try:
from OpenSSL import crypto
crypto.load_pkcs12 # missing in newer versions
OPENSSL_CRYPTO_AVAILABLE = True
except (ImportError, AttributeError): # AttributeError sometimes thrown by old/broken OpenSSL versions
OPENSSL_CRYPTO_AVAILABLE = False


def detect_desktop_environment() -> str:
Expand Down Expand Up @@ -460,7 +467,7 @@ def prompt_nonempty_string(self, show: int, prompt: str, val: str = '') -> str:
show="*" if show == 0 else "")
if output:
return output

else:
command = []
if self.graphics == 'zenity':
Expand Down Expand Up @@ -511,13 +518,13 @@ def __get_username_password_atomic(self) -> None:
password1 = "b"
if self.graphics == 'tkinter':
import tkinter as tk

root = tk.Tk()
root.title(Config.title)

desc_label = tk.Label(root, text=Messages.credentials_prompt)
desc_label.grid(row=0, column=0, columnspan=2, sticky=tk.W)

username_label = tk.Label(root, text=Messages.username_prompt)
username_label.grid(row=1, column=0, sticky=tk.W)

Expand All @@ -535,17 +542,17 @@ def __get_username_password_atomic(self) -> None:

password1_entry = tk.Entry(root, show="*")
password1_entry.grid(row=3, column=1)

def submit(installer):
def inner():
nonlocal password, password1
(installer.username, password, password1) = (username_entry.get(), password_entry.get(), password1_entry.get())
root.destroy()
return inner

login_button = tk.Button(root, text=Messages.ok, command=submit(self))
login_button.grid(row=4, column=0, columnspan=2)

root.mainloop()
else:
if self.graphics == 'zenity':
Expand Down Expand Up @@ -605,7 +612,7 @@ def __get_username_password(self) -> None:
"""
if self.silent:
return
elif self.graphics in ('tkinter', 'zenity', 'yad'):
if self.graphics in ('tkinter', 'zenity', 'yad'):
self.__get_username_password_atomic()
else:
password = "a"
Expand Down Expand Up @@ -637,9 +644,9 @@ def __check_graphics(self, command) -> bool:
shell_command.wait()
if shell_command.returncode == 0:
self.graphics = command
debug("Using "+command)
return True
else:
return False
return False

def __get_graphics_support(self) -> None:
self.graphics = 'tty'
Expand All @@ -649,12 +656,12 @@ def __get_graphics_support(self) -> None:
return
if self.gui != 'tkinter':
if self.__check_graphics(self.gui):
return
return
try:
import tkinter
self.graphics = 'tkinter'
return
except:
except Exception:
pass
for cmd in ('yad', 'zenity', 'kdialog'):
if self.__check_graphics(cmd):
Expand All @@ -663,8 +670,27 @@ def __get_graphics_support(self) -> None:
def __process_p12(self) -> bool:
debug('process_p12')
pfx_file = get_config_path() + '/cat_installer/user.p12'
if CRYPTO_AVAILABLE:
debug("using crypto")
if NEW_CRYPTO_AVAILABLE:
debug("using new crypto")
try:
p12 = pkcs12.load_key_and_certificates(
open(pfx_file,'rb').read(),
self.password, backend=default_backend())
except Exception as error:
debug("Incorrect password ({}).".format(error))
return False
else:
if Config.use_other_tls_id:
return True
try:
self.username = p12[1].subject.\
get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except crypto.Error:
self.username = p12[1].subject.\
get_attributes_for_oid(NameOID.EMAIL_ADDRESS)[0].value
return True
if OPENSSL_CRYPTO_AVAILABLE:
debug("using openssl crypto")
try:
p12 = crypto.load_pkcs12(open(pfx_file, 'rb').read(),
self.password)
Expand All @@ -681,46 +707,45 @@ def __process_p12(self) -> bool:
self.username = p12.get_certificate().\
get_subject().emailAddress
return True
else:
debug("using openssl")
command = ['openssl', 'pkcs12', '-in', pfx_file, '-passin',
'pass:' + self.password, '-nokeys', '-clcerts']
shell_command = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, _ = shell_command.communicate()
if shell_command.returncode != 0:
debug("first password run failed")
command1 = ['openssl', 'pkcs12', '-legacy', '-in', pfx_file, '-passin',
'pass:' + self.password, '-nokeys', '-clcerts']
shell_command1 = subprocess.Popen(command1, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = shell_command1.communicate()
if shell_command1.returncode != 0:
return False
if Config.use_other_tls_id:
return True
out_str = out.decode('utf-8').strip()
# split only on commas that are not inside double quotes
subject = re.split(r'\s*[/,]\s*(?=([^"]*"[^"]*")*[^"]*$)',
re.findall(r'subject=/?(.*)$',
out_str, re.MULTILINE)[0])
cert_prop = {}
for field in subject:
if field:
cert_field = re.split(r'\s*=\s*', field)
cert_prop[cert_field[0].lower()] = cert_field[1]
if cert_prop['cn'] and re.search(r'@', cert_prop['cn']):
debug('Using cn: ' + cert_prop['cn'])
self.username = cert_prop['cn']
elif cert_prop['emailaddress'] and \
re.search(r'@', cert_prop['emailaddress']):
debug('Using email: ' + cert_prop['emailaddress'])
self.username = cert_prop['emailaddress']
else:
self.username = ''
self.alert("Unable to extract username "
"from the certificate")
debug("using openssl")
command = ['openssl', 'pkcs12', '-in', pfx_file, '-passin',
'pass:' + self.password, '-nokeys', '-clcerts']
shell_command = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, _ = shell_command.communicate()
if shell_command.returncode != 0:
debug("first password run failed")
command1 = ['openssl', 'pkcs12', '-legacy', '-in', pfx_file, '-passin',
'pass:' + self.password, '-nokeys', '-clcerts']
shell_command1 = subprocess.Popen(command1, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = shell_command1.communicate()
if shell_command1.returncode != 0:
return False
if Config.use_other_tls_id:
return True
out_str = out.decode('utf-8').strip()
# split only on commas that are not inside double quotes
subject = re.split(r'\s*[/,]\s*(?=([^"]*"[^"]*")*[^"]*$)',
re.findall(r'subject=/?(.*)$',
out_str, re.MULTILINE)[0])
cert_prop = {}
for field in subject:
if field:
cert_field = re.split(r'\s*=\s*', field)
cert_prop[cert_field[0].lower()] = cert_field[1]
if cert_prop['cn'] and re.search(r'@', cert_prop['cn']):
debug('Using cn: ' + cert_prop['cn'])
self.username = cert_prop['cn']
elif cert_prop['emailaddress'] and \
re.search(r'@', cert_prop['emailaddress']):
debug('Using email: ' + cert_prop['emailaddress'])
self.username = cert_prop['emailaddress']
else:
self.username = ''
self.alert("Unable to extract username "
"from the certificate")
return True

def __select_p12_file(self) -> str:
"""
Expand Down Expand Up @@ -763,9 +788,9 @@ def __select_p12_file(self) -> str:
stderr=subprocess.PIPE)
cert, _ = shell_command.communicate()
if self.graphics == 'kdialog':
command = ['kdialog', '--getopenfilename',
'.', '*.p12 *.P12 *.pfx *.PFX | ' +
Messages.p12_filter, '--title=' + Messages.p12_title]
command = ['kdialog', '--getopenfilename', '.', +
# '.', '*.p12 *.P12 *.pfx *.PFX | ' + Messages.p12_filter, # removed filter due to the changed syntax in newer versions
'--title=' + Messages.p12_title]
shell_command = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
cert, _ = shell_command.communicate()
Expand All @@ -782,8 +807,6 @@ def __select_p12_file(self) -> str:
filetypes=(("Certificate file",
("*.p12", "*.P12", "*.pfx",
"*.PFX")),))


return cert.decode('utf-8').strip()

@staticmethod
Expand Down Expand Up @@ -815,7 +838,7 @@ def __get_p12_cred(self):
else:
while not self.password:
self.password = self.prompt_nonempty_string(
0, Messages.enter_import_password)
0, Messages.enter_import_password).encode('utf-8')
if not self.__process_p12():
self.alert(Messages.incorrect_password)
self.password = ''
Expand Down

0 comments on commit 4bec3b4

Please sign in to comment.