Skip to content

Commit

Permalink
Pylint and 3-only modernization
Browse files Browse the repository at this point in the history
  • Loading branch information
gcoxmoz committed Jan 20, 2024
1 parent 46b7f18 commit 26f393e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 62 deletions.
9 changes: 5 additions & 4 deletions duo_openvpn_mozilla/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self):
except (configparser.NoOptionError, configparser.NoSectionError):
_base_facility = 'auth'
try:
self.event_facility = getattr(syslog, 'LOG_{}'.format(_base_facility.upper()))
self.event_facility = getattr(syslog, f'LOG_{_base_facility.upper()}')
except (AttributeError):
self.event_facility = syslog.LOG_AUTH

Expand Down Expand Up @@ -180,8 +180,8 @@ def local_authentication(self):
# Here we have a user not allowed to VPN in at all.
# This is some form of "their account is disabled" and/or
# they aren't in the approved ACL list.
self.log(summary=('FAIL: VPN user "{}" denied for not being '
'allowed to use the VPN'.format(username)),
self.log(summary=(f'FAIL: VPN user "{username}" denied for '
'not being allowed to use the VPN'),
severity='INFO',
details={'username': username,
'sourceipaddress': client_ipaddr,
Expand Down Expand Up @@ -220,7 +220,8 @@ def local_authentication(self):
if session_state is None:
# No variable was sent: we're not in external-auth mode. We're not authoritative.
pass
elif session_state in ['Initial', 'Expired', 'Invalid', 'AuthenticatedEmptyUser', 'ExpiredEmptyUser']:
elif session_state in ['Initial', 'Expired', 'Invalid',
'AuthenticatedEmptyUser', 'ExpiredEmptyUser']:
# We are in a state where we haven't been approved by the server. Ask Duo.
pass
elif session_state in ['Authenticated']:
Expand Down
70 changes: 31 additions & 39 deletions duo_openvpn_mozilla/duo_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, *args, **kwargs):
self.log_func = kwargs.pop('log_func', None)
self.user_config = None

super(DuoAPIAuth, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)

def load_user_to_verify(self, user_config):
"""
Expand Down Expand Up @@ -84,7 +84,7 @@ def _preflight(self):
# server timestamp. So we don't care about the return values.
self.ping() # parent-call
except socket.error as err:
self.log(summary='FAIL: DuoAPIAuth ping {}'.format(err),
self.log(summary=f'FAIL: DuoAPIAuth ping {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
Expand All @@ -96,14 +96,14 @@ def _preflight(self):
self.check() # parent-call
except socket.error as err:
# Super unlikely. Ping should've hit this.
self.log(summary='FAIL: DuoAPIAuth check socket {}'.format(err),
self.log(summary=f'FAIL: DuoAPIAuth check socket {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
return False
except RuntimeError as err:
# This is when the call returns a 401:
self.log(summary='FAIL: DuoAPIAuth check runtime {}'.format(err),
self.log(summary=f'FAIL: DuoAPIAuth check runtime {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
Expand All @@ -126,16 +126,14 @@ def _preauth(self):
# parent-call
except socket.error as err:
# Super unlikely. Ping should've hit this.
self.log(summary=('FAIL: DuoAPIAuth preauth '
'socket-failed {}').format(err),
self.log(summary=f'FAIL: DuoAPIAuth preauth socket-failed {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
return None
except RuntimeError as err:
# This is when the call returns a 400 for bad parameters.
self.log(summary=('FAIL: DuoAPIAuth preauth '
'runtime-failed {}').format(err),
self.log(summary=f'FAIL: DuoAPIAuth preauth runtime-failed {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
Expand All @@ -145,8 +143,8 @@ def _preauth(self):
# We shouldn't need this once
# https://github.com/duosecurity/duo_client_python/issues/111
# is handled
self.log(summary=('FAIL: DuoAPIAuth preauth '
'had a failure talking to Duo. {}').format(err),
self.log(summary=('FAIL: DuoAPIAuth preauth had a failure '
f'talking to Duo. {err}'),
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
Expand All @@ -165,10 +163,10 @@ def _auth(self):
"""
# The mandatory args are here. We add more later depending
# on which factor we're using.
passing_args = dict(
username=self.user_config.username,
factor=self.user_config.factor,
ipaddr=self.user_config.client_ipaddr,)
passing_args = {
'username': self.user_config.username,
'factor': self.user_config.factor,
'ipaddr': self.user_config.client_ipaddr}
if self.user_config.factor == 'passcode':
# Passcode authing must pass the passcode. Duh.
passing_args.update(
Expand All @@ -194,8 +192,8 @@ def _auth(self):
elif self.user_config.factor == 'sms':
# sms is not an auth mechanism, but is a way to get new codes.
# This is not our job, so we don't help people out on this.
self.log(summary=('FAIL: User "{}" denied for trying '
'sms auth'.format(self.user_config.username)),
self.log(summary=(f'FAIL: User "{self.user_config.username}" '
'denied for trying sms auth'),
severity='INFO',
details={
'username': self.user_config.username,
Expand Down Expand Up @@ -226,8 +224,7 @@ def _auth(self):
res = self.auth(**passing_args) # parent-call
except socket.error as err:
# Super unlikely. Ping should've hit this.
self.log(summary='FAIL: DuoAPIAuth auth '
'socket-failed {}'.format(err),
self.log(summary=f'FAIL: DuoAPIAuth auth socket-failed {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
Expand All @@ -236,8 +233,7 @@ def _auth(self):
# This is when the call returns a 400.for bad parameters.
# This is hard to simulate, as we pre-massage the parameters
# that go into an auth call, so any failure here is hard to test.
self.log(summary='FAIL: DuoAPIAuth auth '
'runtime-failed {}'.format(err),
self.log(summary=f'FAIL: DuoAPIAuth auth runtime-failed {err}',
details={'error': 'true',
'success': 'false', },
severity='CRITICAL')
Expand Down Expand Up @@ -272,8 +268,8 @@ def _do_mfa_for_user(self):
'success': 'false', },)
return False
if res['result'] == 'allow':
_summary = ('SUCCESS: User "{user}" authenticated by DuoAPIAuth'
'').format(user=self.user_config.username)
_summary = (f'SUCCESS: User "{self.user_config.username}" '
'authenticated by DuoAPIAuth')
self.log(summary=_summary,
severity='INFO',
details={
Expand All @@ -282,9 +278,8 @@ def _do_mfa_for_user(self):
'sourceipaddress': self.user_config.client_ipaddr,
'success': 'true', },)
return True
_summary = ('FAIL: User "{user}" denied by DuoAPIAuth: '
'{msg}').format(user=self.user_config.username,
msg=res['status_msg'])
_summary = (f'FAIL: User "{self.user_config.username}" '
f'denied by DuoAPIAuth: {res["status_msg"]}')
self.log(summary=_summary,
severity='WARNING',
details={
Expand All @@ -310,9 +305,8 @@ def main_auth(self):
# Failed preflight - there's no connection to Duo.
# Fail safe / fail secure.
if self._fail_open:
self.log(summary=('SUCCESS: Duo failed open, '
'user "{}" allowed '
'in'.format(self.user_config.username)),
self.log(summary=('SUCCESS: Duo failed open, user '
f'"{self.user_config.username}" allowed in'),
severity='CRITICAL',
details={
'username': self.user_config.username,
Expand All @@ -332,8 +326,8 @@ def main_auth(self):
# the likely case is that there was a parameter issue, and that
# means we didn't get an approval, so kick the user out because
# something is bad.
self.log(summary=('FAIL: User "{}" denied due to preauth failure'
''.format(self.user_config.username)),
self.log(summary=(f'FAIL: User "{self.user_config.username}" '
'denied due to preauth failure'),
severity='ERROR',
details={
'username': self.user_config.username,
Expand All @@ -356,8 +350,8 @@ def main_auth(self):
# would check in with Duo, yet be allowed in without doing
# a MFA proof. We must return True since they're approved,
# but it's an unusual case to come across.
self.log(summary=('SUCCESS: User "{}" allowed without MFA'
''.format(self.user_config.username)),
self.log(summary=(f'SUCCESS: User "{self.user_config.username}" '
'allowed without MFA'),
severity='INFO',
details={
'username': self.user_config.username,
Expand All @@ -369,9 +363,8 @@ def main_auth(self):
# The key here is, this must return False to indicate a failed
# login attempt.
# It's not our job to enroll, so kick them out.
self.log(summary=('FAIL: Unexpected/non-enrolled Duo '
'user "{}" denied'
''.format(self.user_config.username)),
self.log(summary=('FAIL: Unexpected/non-enrolled Duo user '
f'"{self.user_config.username}" denied'),
severity='INFO',
details={
'username': self.user_config.username,
Expand All @@ -382,8 +375,8 @@ def main_auth(self):
# We do not have a perpetually-locked-out user to test against.
# The key here is, this must return False to indicate a failed
# login attempt.
self.log(summary=('FAIL: User "{}" explicitly denied by Duo'
''.format(self.user_config.username)),
self.log(summary=(f'FAIL: User "{self.user_config.username}" '
'explicitly denied by Duo'),
severity='INFO',
details={
'username': self.user_config.username,
Expand All @@ -405,8 +398,7 @@ def main_auth(self):
# and will log when it gets answers
return self._do_mfa_for_user()
# The reply from Duo is unknown. Probably an API changed.
_summary = ('FAIL: Unexpected result from DuoAPIAuth: '
'{msg}').format(msg=preauth['result'])
_summary = f'FAIL: Unexpected result from DuoAPIAuth: {preauth["result"]}'
self.log(summary=_summary,
severity='ERROR',
details={
Expand Down
19 changes: 7 additions & 12 deletions test/integration/test_duo_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def setUp(self):
#
os.environ['untrusted_ip'] = 'testing-ip-Unknown-is-OK'
os.environ['common_name'] = self.normal_user
user_creds = dict()
user_creds = {}
for varname in OpenVPNCredentials.DUO_RESERVED_WORDS:
os.environ['password'] = varname
res = OpenVPNCredentials()
Expand Down Expand Up @@ -242,8 +242,7 @@ def _auth_testing_run(self, testcase, answer):
return self.skipTest('because of .deep_testing preference')
self.library.load_user_to_verify(self.user_data[testcase])
if testcase != 'passcode' and not self._can_we_run_a_test(testcase):
return self.skipTest('incapable device for {tc}'.format(
tc=testcase))
return self.skipTest(f'incapable device for {testcase}')
res = self.library._auth()
self.assertIsInstance(res, dict,
'_auth must return a dict')
Expand All @@ -252,21 +251,19 @@ def _auth_testing_run(self, testcase, answer):
self.assertIn(res['result'], ['allow', 'deny'],
'_auth result must be "allow" or "deny"')
self.assertEqual(res['result'], answer,
'_auth result must be "{ans}"'.format(ans=answer))
f'_auth result must be "{answer}"')

def _mfa_testing_run(self, testcase, answer):
if not self.deep_test_mfa: # pragma: no cover
return self.skipTest('because of .deep_testing preference')
self.library.load_user_to_verify(self.user_data[testcase])
if testcase != 'passcode' and not self._can_we_run_a_test(testcase):
return self.skipTest('incapable device for {tc}'.format(
tc=testcase))
return self.skipTest(f'incapable device for {testcase}')
res = self.library._do_mfa_for_user()
self.assertIsInstance(res, bool,
'_do_mfa_for_user must return a bool')
self.assertEqual(res, answer,
'_do_mfa_for_user result must '
'be "{ans}"'.format(ans=answer))
f'_do_mfa_for_user result must be "{answer}"')

def test_auth_03(self):
""" _auth with auto - PLEASE ALLOW """
Expand Down Expand Up @@ -411,14 +408,12 @@ def _main_testing_run(self, testcase, answer):
return self.skipTest('because of .deep_testing preference')
self.library.load_user_to_verify(self.user_data[testcase])
if testcase != 'passcode' and not self._can_we_run_a_test(testcase):
return self.skipTest('incapable device for {tc}'.format(
tc=testcase))
return self.skipTest(f'incapable device for {testcase}')
res = self.library.main_auth()
self.assertIsInstance(res, bool,
'main_auth must return a bool')
self.assertEqual(res, answer,
'main_auth result must '
'be "{ans}"'.format(ans=answer))
f'main_auth result must be "{answer}"')

def test_main_03(self):
""" main_auth with auto - PLEASE ALLOW """
Expand Down
4 changes: 2 additions & 2 deletions test/unit/test_duo_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def setUp(self):
config.set('duo-credentials', 'IKEY', 'DI9QQ99X9MK4H99RJ9FF')
config.set('duo-credentials', 'SKEY', '2md9rw5xeyxt8c648dgkmdrg3zpvnhj5b596mgku')
config.set('duo-credentials', 'HOST', 'api-9f134ff9.duosekurity.com')
with open(self.testing_conffile, 'w') as configfile:
with open(self.testing_conffile, 'w', encoding='utf-8') as configfile:
config.write(configfile)

with mock.patch.object(DuoOpenVPN, 'CONFIG_FILE_LOCATIONS',
Expand All @@ -46,7 +46,7 @@ def setUp(self):
self.main_object = DuoOpenVPN()
os.environ['untrusted_ip'] = 'testing-ip-Unknown-is-OK'
os.environ['common_name'] = 'bob'
user_creds = dict()
user_creds = {}
for varname in OpenVPNCredentials.DUO_RESERVED_WORDS:
os.environ['password'] = varname
res = OpenVPNCredentials()
Expand Down
11 changes: 6 additions & 5 deletions test/unit/test_duo_openvpn.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def setUp(self):
config.set('duo-credentials', 'IKEY', 'DI9QQ99X9MK4H99RJ9FF')
config.set('duo-credentials', 'SKEY', '2md9rw5xeyxt8c648dgkmdrg3zpvnhj5b596mgku')
config.set('duo-credentials', 'HOST', 'api-9f134ff9.duosekurity.com')
with open(self.testing_conffile, 'w') as configfile:
with open(self.testing_conffile, 'w', encoding='utf-8') as configfile:
config.write(configfile)
with mock.patch.object(DuoOpenVPN, 'CONFIG_FILE_LOCATIONS',
new=[self.testing_conffile]):
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_05_ingest_bad_config_file(self):
def test_06_ingest_config_from_file(self):
""" With an actual config file, get a populated ConfigParser """
test_reading_file = '/tmp/test-reader.txt'
with open(test_reading_file, 'w') as filepointer:
with open(test_reading_file, 'w', encoding='utf-8') as filepointer:
filepointer.write('[aa]\nbb = cc\n')
filepointer.close()
with mock.patch.object(DuoOpenVPN, 'CONFIG_FILE_LOCATIONS',
Expand Down Expand Up @@ -120,7 +120,7 @@ def test_08_ingest_configs(self):
config.add_section('duo-openvpn')
config.set('duo-openvpn', 'syslog-events-send', 'True')
config.set('duo-openvpn', 'syslog-events-facility', 'local5')
with open(self.testing_conffile, 'w') as configfile:
with open(self.testing_conffile, 'w', encoding='utf-8') as configfile:
config.write(configfile)
with mock.patch.object(DuoOpenVPN, 'CONFIG_FILE_LOCATIONS',
new=[self.testing_conffile]):
Expand All @@ -144,7 +144,7 @@ def test_09_ingest_stupidity(self):
config.set('duo-behavior', 'duo-timeout', '-5')
config.add_section('duo-openvpn')
config.set('duo-openvpn', 'syslog-events-facility', 'junk')
with open(self.testing_conffile, 'w') as configfile:
with open(self.testing_conffile, 'w', encoding='utf-8') as configfile:
config.write(configfile)
with mock.patch.object(DuoOpenVPN, 'CONFIG_FILE_LOCATIONS',
new=[self.testing_conffile]):
Expand All @@ -164,7 +164,8 @@ def test_10_log_nosend(self):
def test_11_log_send(self):
''' Test the log method tries to send '''
datetime_mock = mock.Mock(wraps=datetime.datetime)
datetime_mock.now.return_value = datetime.datetime(2020, 12, 25, 13, 14, 15, 123456, tzinfo=datetime.timezone.utc)
datetime_mock.now.return_value = datetime.datetime(2020, 12, 25, 13, 14, 15, 123456,
tzinfo=datetime.timezone.utc)
self.library.event_send = True
self.library.event_facility = syslog.LOG_LOCAL1
with mock.patch('syslog.openlog') as mock_openlog, \
Expand Down

0 comments on commit 26f393e

Please sign in to comment.