From 1a56f6ccf656f9ae2c5291c6a5a71409d25a6bf6 Mon Sep 17 00:00:00 2001 From: Nathan V Date: Wed, 2 May 2018 14:23:51 -0700 Subject: [PATCH] Support more MFA * Support SMS OTP * Support Call OTP * Support Question type MFA * Separate okta_saml * Minor fix in OktaSaml to improve user messaging if appid isn't found * Updated README * Version bump to 0.4.0 --- README.md | 6 + aws_okta_keyman/keyman.py | 34 ++- aws_okta_keyman/metadata.py | 2 +- aws_okta_keyman/okta.py | 184 +++++++++++----- aws_okta_keyman/okta_saml.py | 65 ++++++ aws_okta_keyman/test/keyman_test.py | 69 +++++- aws_okta_keyman/test/okta_test.py | 319 ++++++++++++++++++++++++---- 7 files changed, 571 insertions(+), 108 deletions(-) create mode 100644 aws_okta_keyman/okta_saml.py diff --git a/README.md b/README.md index c088c66..0a0a544 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,12 @@ successful the user may close the browser or tab. * Duo Auth * Okta OTP * Google Auth OTP +* SMS OTP +* Call OTP +* Question/Answer + +Windows Hello, U2F, email, and physical token (RSA, Symantec) are not supported +at this time. ## Multiple AWS Roles diff --git a/aws_okta_keyman/keyman.py b/aws_okta_keyman/keyman.py index 27f787a..cd7740e 100644 --- a/aws_okta_keyman/keyman.py +++ b/aws_okta_keyman/keyman.py @@ -26,7 +26,7 @@ import rainbow_logging_handler import requests -from aws_okta_keyman import aws, okta +from aws_okta_keyman import aws, okta, okta_saml from aws_okta_keyman.config import Config from aws_okta_keyman.metadata import __desc__, __version__ @@ -123,13 +123,14 @@ def init_okta(self, password): """ try: if self.config.oktapreview is True: - self.okta_client = okta.OktaSaml(self.config.org, - self.config.username, - password, oktapreview=True) + self.okta_client = okta_saml.OktaSaml(self.config.org, + self.config.username, + password, + oktapreview=True) else: - self.okta_client = okta.OktaSaml(self.config.org, - self.config.username, - password) + self.okta_client = okta_saml.OktaSaml(self.config.org, + self.config.username, + password) except okta.EmptyInput: self.log.fatal('Cannot enter a blank string for any input') @@ -156,6 +157,17 @@ def auth_okta(self): verified = self.okta_client.validate_mfa(err.fid, err.state_token, passcode) + except okta.AnswerRequired as err: + self.log.warning('Question/Answer MFA response required.') + self.log.warning("{}".format( + err.factor['profile']['questionText']) + ) + verified = False + while not verified: + answer = self.user_input('Answer: ') + verified = self.okta_client.validate_answer(err.factor['id'], + err.state_token, + answer) except okta.UnknownError as err: self.log.fatal("Fatal error: {}".format(err)) sys.exit(1) @@ -173,8 +185,12 @@ def handle_multiple_roles(self, session): def start_session(self): """Initialize AWS session object.""" - assertion = self.okta_client.get_assertion(appid=self.config.appid, - apptype='amazon_aws') + try: + assertion = self.okta_client.get_assertion(appid=self.config.appid, + apptype='amazon_aws') + except okta.UnknownError: + sys.exit(1) + return aws.Session(assertion, profile=self.config.name) def aws_auth_loop(self): diff --git a/aws_okta_keyman/metadata.py b/aws_okta_keyman/metadata.py index ae3d977..dd25e3a 100644 --- a/aws_okta_keyman/metadata.py +++ b/aws_okta_keyman/metadata.py @@ -14,7 +14,7 @@ # Copyright 2018 Nathan V """Package metadata.""" -__version__ = '0.3.3' +__version__ = '0.4.0' __desc__ = 'AWS Okta Keyman' __desc_long__ = (''' =============== diff --git a/aws_okta_keyman/okta.py b/aws_okta_keyman/okta.py index 222c428..7665f02 100644 --- a/aws_okta_keyman/okta.py +++ b/aws_okta_keyman/okta.py @@ -17,16 +17,14 @@ # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -"""This contains all of the Okta-specific code we need.""" +"""This contains the Okta client code.""" from __future__ import unicode_literals -import base64 import logging import time from multiprocessing import Process import sys import webbrowser -import bs4 import requests from aws_okta_keyman.duo import Duo @@ -65,6 +63,15 @@ def __init__(self, fid, state_token, provider): super(PasscodeRequired, self).__init__() +class AnswerRequired(BaseException): + """A 2FA Passcode Must Be Entered.""" + + def __init__(self, factor, state_token): + self.factor = factor + self.state_token = state_token + super(AnswerRequired, self).__init__() + + class OktaVerifyRequired(BaseException): """OktaVerify Authentication Is Required.""" @@ -160,24 +167,61 @@ def validate_mfa(self, fid, state_token, passcode): Returns: True/False whether or not authentication was successful """ - if len(passcode) != 6: - LOG.error('Passcodes must be 6 digits') + if len(passcode) > 6 or len(passcode) < 5: + LOG.error('Passcodes must be 5 or 6 digits') + return False + + valid = self.send_user_response(fid, state_token, passcode, 'passCode') + if valid: + self.set_token(valid) + return True + else: return False + def validate_answer(self, fid, state_token, answer): + """Validate an Okta user with Question-based MFA. + + Takes in the supplied Factor ID (fid), State Token and user supplied + Passcode, and validates the auth. If successful, sets the session + token. If invalid, raises an exception. + + Args: + fid: Okta Factor ID (returned in the PasscodeRequired exception) + state_token: State Tken (returned in the PasscodeRequired + exception) + answer: The user-supplied answer to verify + + Returns: + True/False whether or not authentication was successful + """ + if len(answer) == 0: + LOG.error('Answer cannot be blank') + return False + + valid = self.send_user_response(fid, state_token, answer, 'answer') + if valid: + self.set_token(valid) + return True + else: + return False + + def send_user_response(self, fid, state_token, user_response, resp_type): + """Call Okta with a factor response and verify it.""" path = '/authn/factors/{fid}/verify'.format(fid=fid) data = {'fid': fid, 'stateToken': state_token, - 'passCode': passcode} + resp_type: user_response} try: - ret = self._request(path, data) + return self._request(path, data) except requests.exceptions.HTTPError as err: if err.response.status_code == 403: LOG.error('Invalid Passcode Detected') return False - raise UnknownError(err.response.body) + if err.response.status_code == 401: + LOG.error('Invalid Passcode Retries Exceeded') + raise UnknownError('Retries exceeded') - self.set_token(ret) - return True + raise UnknownError(err.response.body) def okta_verify(self, fid, state_token): """Trigger an Okta Push Verification and waits. @@ -309,61 +353,87 @@ def handle_mfa_response(self, ret): """In the case of an MFA response evaluate the response and handle accordingly based on available MFA factors. """ - otp_possible = False - otp_provider = None + response_types = ['sms', 'question', 'call', 'token:software:totp'] + push_factors = [] + response_factors = [] for factor in ret['_embedded']['factors']: if factor['factorType'] == 'push': - if self.okta_verify(factor['id'], ret['stateToken']): + LOG.debug('Okta Verify factor found') + push_factors.append(factor) + if factor['provider'] == 'DUO': + LOG.debug('Duo Auth factor found') + push_factors.append(factor) + if factor['factorType'] in response_types: + LOG.debug("{} factor found".format(factor['factorType'])) + response_factors.append(factor) + + if self.handle_push_factors(push_factors, ret['stateToken']): + return True + + self.handle_response_factors(response_factors, ret['stateToken']) + + # If we haven't returned or raised yet the factor requested isn't + # supported + LOG.debug("Factors from Okta: {}".format( + ret['_embedded']['factors'])) + LOG.fatal('MFA type in use is unsupported') + raise UnknownError('MFA type in use is unsupported') + + def handle_push_factors(self, factors, state_token): + """Handle any push-type factors.""" + for factor in factors: + if factor['factorType'] == 'push': + LOG.debug('Okta Verify factor found') + if self.okta_verify(factor['id'], state_token): return True if factor['provider'] == 'DUO': - if self.duo_auth(factor['id'], ret['stateToken']): + LOG.debug('Duo Auth factor found') + if self.duo_auth(factor['id'], state_token): return True + return False + + def handle_response_factors(self, factors, state_token): + """Handle any OTP-type factors.""" + otp_provider = None + otp_factor = None + for factor in factors: + if factor['factorType'] == 'sms': + self.request_otp(factor['id'], state_token, 'SMS') + phone = factor['profile']['phoneNumber'] + otp_provider = "SMS ({})".format(phone) + otp_factor = factor['id'] + break + if factor['factorType'] == 'call': + self.request_otp(factor['id'], state_token, 'phone call') + phone = factor['profile']['phoneNumber'] + otp_provider = "call ({})".format(phone) + otp_factor = factor['id'] + break + if factor['factorType'] == 'question': + raise AnswerRequired(factor, state_token) if factor['factorType'] == 'token:software:totp': - # Handle OTP separately in case we can do Okta or Duo but fail - # then we can fall back to OTP - LOG.debug('Software OTP option found') otp_provider = factor['provider'] - otp_possible = True + otp_factor = factor['id'] - if otp_possible: + if otp_provider: raise PasscodeRequired( - fid=factor['id'], - state_token=ret['stateToken'], + fid=otp_factor, + state_token=state_token, provider=otp_provider) - else: - # Log out the factors to make debugging MFA issues easier - LOG.debug("Factors from Okta: {}".format( - ret['_embedded']['factors'])) - LOG.fatal('MFA type in use is unsupported') - raise UnknownError('MFA type in use is unsupported') - - -class OktaSaml(Okta): - """Handle the SAML part of talking to Okta.""" - - def assertion(self, saml): - """Parse the assertion from the SAML response.""" - assertion = '' - soup = bs4.BeautifulSoup(saml, 'html.parser') - for inputtag in soup.find_all('input'): - if inputtag.get('name') == 'SAMLResponse': - assertion = inputtag.get('value') - return base64.b64decode(assertion) - - def get_assertion(self, appid, apptype): - """Call Okta and get the assertion.""" - path = '{url}/home/{apptype}/{appid}'.format( - url=self.base_url, apptype=apptype, appid=appid) - resp = self.session.get(path, - params={'onetimetoken': self.session_token}) - LOG.debug(resp.__dict__) - try: - resp.raise_for_status() - except (requests.exceptions.HTTPError, - requests.exceptions.ConnectionError) as err: - LOG.error('Unknown error: {msg}'.format( - msg=str(err.response.__dict__))) - raise UnknownError() - - return self.assertion(resp.text) + def request_otp(self, fid, state_token, otp_type): + """Trigger an OTP call, SMS, or other and return + + We trigger the push, and then immediately return as the next step is + essentially just an OTP code entry + + Args: + fid: Okta Factor ID used to trigger the push + state_token: State Token allowing us to trigger the push + otp_type: String shown in log for OTP type + """ + LOG.warning("Okta {} being requested...".format(otp_type)) + path = '/authn/factors/{fid}/verify'.format(fid=fid) + data = {'fid': fid, + 'stateToken': state_token} + self._request(path, data) diff --git a/aws_okta_keyman/okta_saml.py b/aws_okta_keyman/okta_saml.py new file mode 100644 index 0000000..03d3d11 --- /dev/null +++ b/aws_okta_keyman/okta_saml.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# +# Credits: Portions of this code were copied/modified from +# https://github.com/ThoughtWorksInc/oktaauth +# +# Copyright (c) 2015, Peter Gillard-Moss +# All rights reserved. + +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. + +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +"""This contains all of the Okta SML specific code.""" +from __future__ import unicode_literals +import base64 +import logging + +import bs4 +import requests + +from aws_okta_keyman import okta + + +LOG = logging.getLogger(__name__) + + +class OktaSaml(okta.Okta): + """Handle the SAML part of talking to Okta.""" + + def assertion(self, saml): + """Parse the assertion from the SAML response.""" + assertion = '' + soup = bs4.BeautifulSoup(saml, 'html.parser') + for inputtag in soup.find_all('input'): + if inputtag.get('name') == 'SAMLResponse': + assertion = inputtag.get('value') + return base64.b64decode(assertion) + + def get_assertion(self, appid, apptype): + """Call Okta and get the assertion.""" + path = '{url}/home/{apptype}/{appid}'.format( + url=self.base_url, apptype=apptype, appid=appid) + resp = self.session.get(path, + params={'onetimetoken': self.session_token}) + LOG.debug(resp.__dict__) + + try: + resp.raise_for_status() + except (requests.exceptions.HTTPError, + requests.exceptions.ConnectionError) as err: + if err.response.status_code == 404: + LOG.fatal("Provided App ID {} not found".format(appid)) + else: + LOG.error('Unknown error: {msg}'.format( + msg=str(err.response.__dict__))) + raise okta.UnknownError() + + return self.assertion(resp.text) diff --git a/aws_okta_keyman/test/keyman_test.py b/aws_okta_keyman/test/keyman_test.py index 5254145..7b49b45 100644 --- a/aws_okta_keyman/test/keyman_test.py +++ b/aws_okta_keyman/test/keyman_test.py @@ -140,7 +140,7 @@ def test_handle_appid_selection_when_appid_provided(self, config_mock): self.assertEqual(keyman.handle_appid_selection(), None) @mock.patch('aws_okta_keyman.keyman.Config') - @mock.patch('aws_okta_keyman.keyman.okta') + @mock.patch('aws_okta_keyman.keyman.okta_saml') def test_init_okta(self, okta_mock, _config_mock): okta_mock.OktaSaml = mock.MagicMock() keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) @@ -151,7 +151,7 @@ def test_init_okta(self, okta_mock, _config_mock): ]) @mock.patch('aws_okta_keyman.keyman.Config') - @mock.patch('aws_okta_keyman.keyman.okta') + @mock.patch('aws_okta_keyman.keyman.okta_saml') def test_init_okta_with_oktapreview(self, okta_mock, _config_mock): okta_mock.OktaSaml = mock.MagicMock() keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) @@ -163,7 +163,7 @@ def test_init_okta_with_oktapreview(self, okta_mock, _config_mock): ]) @mock.patch('aws_okta_keyman.keyman.Config') - @mock.patch('aws_okta_keyman.keyman.okta') + @mock.patch('aws_okta_keyman.keyman.okta_saml') def test_init_okta_with_empty_input(self, okta_mock, _config_mock): okta_mock.EmptyInput = BaseException okta_mock.OktaSaml = mock.MagicMock() @@ -198,12 +198,62 @@ def test_auth_okta_mfa(self, _config_mock): 'c') keyman.okta_client.validate_mfa.return_value = True keyman.user_input = mock.MagicMock() - keyman.user_input.return_value = "000000" + keyman.user_input.return_value = '000000' keyman.auth_okta() keyman.okta_client.validate_mfa.assert_has_calls([ - mock.call('a', 'b', "000000") + mock.call('a', 'b', '000000'), + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta_mfa_retry(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.auth.side_effect = okta.PasscodeRequired('a', 'b', + 'c') + keyman.okta_client.validate_mfa.side_effect = [False, True] + keyman.user_input = mock.MagicMock() + keyman.user_input.return_value = '000000' + + keyman.auth_okta() + + keyman.okta_client.validate_mfa.assert_has_calls([ + mock.call('a', 'b', '000000'), + mock.call('a', 'b', '000000'), + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta_answer(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + factor = {'id': 'foo', 'profile': {'questionText': 'a'}} + keyman.okta_client.auth.side_effect = okta.AnswerRequired(factor, 'b') + keyman.okta_client.validate_answer.return_value = True + keyman.user_input = mock.MagicMock() + keyman.user_input.return_value = 'Someanswer' + + keyman.auth_okta() + + keyman.okta_client.validate_answer.assert_has_calls([ + mock.call('foo', 'b', 'Someanswer'), + ]) + + @mock.patch('aws_okta_keyman.keyman.Config') + def test_auth_okta_answer_retry(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + factor = {'id': 'foo', 'profile': {'questionText': 'a'}} + keyman.okta_client.auth.side_effect = okta.AnswerRequired(factor, 'b') + keyman.okta_client.validate_answer.side_effect = [False, True] + keyman.user_input = mock.MagicMock() + keyman.user_input.return_value = 'Someanswer' + + keyman.auth_okta() + + keyman.okta_client.validate_answer.assert_has_calls([ + mock.call('foo', 'b', 'Someanswer'), + mock.call('foo', 'b', 'Someanswer'), ]) @mock.patch('aws_okta_keyman.keyman.Config') @@ -254,6 +304,15 @@ def test_start_session(self, aws_mock, _config_mock): mock.call.Session('assertion', profile=mock.ANY) ]) + @mock.patch('aws_okta_keyman.keyman.Config') + def test_start_session_failure(self, _config_mock): + keyman = Keyman(['foo', '-o', 'foo', '-u', 'bar', '-a', 'baz']) + keyman.okta_client = mock.MagicMock() + keyman.okta_client.get_assertion.side_effect = okta.UnknownError + + with self.assertRaises(SystemExit): + keyman.start_session() + @mock.patch('aws_okta_keyman.keyman.Config') def test_aws_auth_loop(self, config_mock): config_mock().reup = False diff --git a/aws_okta_keyman/test/okta_test.py b/aws_okta_keyman/test/okta_test.py index 67b17b4..dbb36e7 100644 --- a/aws_okta_keyman/test/okta_test.py +++ b/aws_okta_keyman/test/okta_test.py @@ -65,7 +65,7 @@ }, 'stateToken': 'token', } -MFA_CHALLENGE_RESPONSE_PASSCODE = { +MFA_CHALLENGE_OKTA_OTP = { 'status': 'MFA_REQUIRED', '_embedded': { 'factors': [ @@ -78,6 +78,76 @@ }, 'stateToken': 'token', } +MFA_CHALLENGE_GOOGLE_OTP = { + 'status': 'MFA_REQUIRED', + '_embedded': { + 'factors': [ + { + 'factorType': 'token:software:totp', + 'provider': 'GOOGLE', + 'id': 'abcd', + } + ] + }, + 'stateToken': 'token', +} +MFA_CHALLENGE_SMS_OTP = { + 'status': 'MFA_REQUIRED', + '_embedded': { + 'factors': [ + { + 'factorType': 'sms', + 'provider': 'OKTA', + 'id': 'abcd', + 'profile': {'phoneNumber': '(xxx) xxx-1234'}, + } + ] + }, + 'stateToken': 'token', +} +MFA_CHALLENGE_CALL_OTP = { + 'status': 'MFA_REQUIRED', + '_embedded': { + 'factors': [ + { + 'factorType': 'call', + 'provider': 'OKTA', + 'id': 'abcd', + 'profile': {'phoneNumber': '(xxx) xxx-1234'}, + } + ] + }, + 'stateToken': 'token', +} +MFA_CHALLENGE_QUESTION = { + 'status': 'MFA_REQUIRED', + '_embedded': { + 'factors': [ + { + 'factorType': 'question', + 'provider': 'OKTA', + 'id': 'abcd', + 'profile': + {'question': 'what_is_your_quest?', + 'questionText': 'What is your quest?'}, + } + ] + }, + 'stateToken': 'token', +} +MFA_CHALLENGE_RSA_TOKEN = { + 'status': 'MFA_REQUIRED', + '_embedded': { + 'factors': [ + { + 'factorType': 'token', + 'provider': 'RSA', + 'id': 'abcd', + } + ] + }, + 'stateToken': 'token', +} MFA_WAITING_RESPONSE = { 'status': 'MFA_CHALLENGE', 'factorResult': 'WAITING', @@ -220,21 +290,60 @@ def test_set_token(self): client.set_token(SUCCESS_RESPONSE) self.assertEquals(client.session_token, 'XXXTOKENXXX') + def test_validate_mfa(self): + client = okta.Okta('organization', 'username', 'password') + client.send_user_response = mock.MagicMock(name='send_user_response') + client.send_user_response.return_value = {True} + client.set_token = mock.MagicMock() + ret = client.validate_mfa('fid', 'token', '123456') + self.assertEquals(ret, True) + client.set_token.assert_called_with({True}) + def test_validate_mfa_too_short(self): client = okta.Okta('organization', 'username', 'password') ret = client.validate_mfa('fid', 'token', '123') self.assertEquals(False, ret) - def test_validate_mfa_invalid_token(self): + def test_validate_mfa_failed(self): + client = okta.Okta('organization', 'username', 'password') + client.send_user_response = mock.MagicMock(name='send_user_response') + client.send_user_response.return_value = False + client.set_token = mock.MagicMock() + ret = client.validate_mfa('fid', 'token', '123456') + self.assertEquals(False, ret) + + def test_validate_answer(self): + client = okta.Okta('organization', 'username', 'password') + client.send_user_response = mock.MagicMock(name='send_user_response') + client.send_user_response.return_value = {True} + client.set_token = mock.MagicMock() + ret = client.validate_answer('fid', 'token', '123456') + self.assertEquals(ret, True) + client.set_token.assert_called_with({True}) + + def test_validate_answer_too_short(self): + client = okta.Okta('organization', 'username', 'password') + ret = client.validate_answer('fid', 'token', '') + self.assertEquals(False, ret) + + def test_validate_answer_failed(self): + client = okta.Okta('organization', 'username', 'password') + client.send_user_response = mock.MagicMock(name='send_user_response') + client.send_user_response.return_value = False + client.set_token = mock.MagicMock() + ret = client.validate_answer('fid', 'token', '123456') + self.assertEquals(False, ret) + + def test_send_user_response(self): client = okta.Okta('organization', 'username', 'password') resp = requests.Response() - resp.status_code = 403 + resp.status_code = 200 + resp.body = 'Dat' client._request = mock.MagicMock(name='_request') - client._request.side_effect = requests.exceptions.HTTPError( - response=resp) + client._request.return_value = resp - ret = client.validate_mfa('fid', 'token', '123456') - self.assertEquals(False, ret) + ret = client.send_user_response('fid', 'token', '123456', 'passCode') + self.assertEquals(200, ret.status_code) client._request.assert_has_calls([ mock.call( @@ -242,25 +351,40 @@ def test_validate_mfa_invalid_token(self): {'fid': 'fid', 'stateToken': 'token', 'passCode': '123456'}) ]) - def test_validate_mfa_unknown_error(self): + def test_send_user_response_invalid_token(self): client = okta.Okta('organization', 'username', 'password') resp = requests.Response() - resp.status_code = 500 - resp.body = 'Something bad happened' + resp.status_code = 403 + client._request = mock.MagicMock(name='_request') + client._request.side_effect = requests.exceptions.HTTPError( + response=resp) + + ret = client.send_user_response('fid', 'token', '123456', 'passCode') + self.assertEquals(False, ret) + + def test_send_user_response_retries_exceeded(self): + client = okta.Okta('organization', 'username', 'password') + resp = requests.Response() + resp.status_code = 401 + resp.body = 'Too many failures' client._request = mock.MagicMock(name='_request') client._request.side_effect = requests.exceptions.HTTPError( response=resp) with self.assertRaises(okta.UnknownError): - client.validate_mfa('fid', 'token', '123456') + client.send_user_response('fid', 'token', '123456', 'passCode') - def test_validate_mfa(self): + def test_send_user_response_unknown_error(self): client = okta.Okta('organization', 'username', 'password') + resp = requests.Response() + resp.status_code = 500 + resp.body = 'Something bad happened' client._request = mock.MagicMock(name='_request') - client._request.return_value = SUCCESS_RESPONSE - ret = client.validate_mfa('fid', 'token', '123456') - self.assertEquals(ret, True) - self.assertEquals(client.session_token, 'XXXTOKENXXX') + client._request.side_effect = requests.exceptions.HTTPError( + response=resp) + + with self.assertRaises(okta.UnknownError): + client.send_user_response('fid', 'token', '123456', 'passCode') def test_okta_verify(self): client = okta.Okta('organization', 'username', 'password') @@ -366,41 +490,87 @@ def test_auth_requires_mfa_enroll(self): def test_handle_mfa_response_trigger_okta_verify(self): client = okta.Okta('organization', 'username', 'password') - client.okta_verify = mock.MagicMock( - name='okta_verify') + client.handle_push_factors = mock.MagicMock( + name='handle_push_factors') + client.handle_push_factors.return_value = True - ret = client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_OKTA_VERIFY) + client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_OKTA_VERIFY) - self.assertEquals(ret, True) - client.okta_verify.assert_has_calls([ - mock.call('abcd', 'token') + client.handle_push_factors.assert_has_calls([ + mock.call( + [{'factorType': 'push', 'provider': 'OKTA', 'id': 'abcd'}], + 'token') + ]) + + def test_handle_mfa_response_trigger_duo_auth(self): + client = okta.Okta('organization', 'username', 'password') + client.handle_push_factors = mock.MagicMock( + name='handle_push_factors') + client.handle_push_factors.return_value = True + + client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_DUO_AUTH) + client.handle_push_factors.assert_has_calls([ + mock.call([{'factorType': 'web', 'provider': 'DUO', 'id': 'abcd'}], + 'token') ]) - def test_handle_mfa_response_trigger_okta_verify_canceled(self): + def test_handle_mfa_response_trigger_sms_otp(self): client = okta.Okta('organization', 'username', 'password') - client.okta_verify = mock.MagicMock( - name='okta_verify') - client.okta_verify.return_value = None + client.handle_push_factors = mock.MagicMock() + client.handle_push_factors.return_value = False + client.handle_response_factors = mock.MagicMock( + name='handle_response_factors') + passcode = okta.PasscodeRequired('', '', '') + client.handle_response_factors.side_effect = passcode + + with self.assertRaises(okta.PasscodeRequired): + client.handle_mfa_response(MFA_CHALLENGE_SMS_OTP) + client.handle_response_factors.assert_has_calls([ + mock.call([{'factorType': 'sms', 'provider': 'OKTA', 'id': 'abcd', + 'profile': {'phoneNumber': '(xxx) xxx-1234'}}], + 'token') + ]) + + def test_handle_mfa_response_unsupported(self): + client = okta.Okta('organization', 'username', 'password') + client.handle_push_factors = mock.MagicMock() + client.handle_push_factors.return_value = False with self.assertRaises(okta.UnknownError): - client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_OKTA_VERIFY) + client.handle_mfa_response(MFA_CHALLENGE_RSA_TOKEN) - def test_handle_mfa_response_trigger_duo_auth(self): + def test_handle_push_factors_empty(self): client = okta.Okta('organization', 'username', 'password') - client.duo_auth = mock.MagicMock(name='duo_auth') - client.duo_auth.return_value = True - ret = client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_DUO_AUTH) - self.assertEquals(ret, True) - client.duo_auth.assert_has_calls([ + ret = client.handle_push_factors([], 'token') + + self.assertEqual(ret, False) + + def test_handle_push_factors_okta_verify(self): + client = okta.Okta('organization', 'username', 'password') + client.okta_verify = mock.MagicMock(name='okta_verify') + client.okta_verify.return_value = True + factor = MFA_CHALLENGE_RESPONSE_OKTA_VERIFY['_embedded']['factors'] + + ret = client.handle_push_factors(factor, 'token') + + self.assertEqual(ret, True) + client.okta_verify.assert_has_calls([ mock.call('abcd', 'token') ]) - def test_handle_mfa_response_throws_passcode_required(self): + def test_handle_push_factors_duo_auth(self): client = okta.Okta('organization', 'username', 'password') + client.duo_auth = mock.MagicMock(name='duo_auth') + client.duo_auth.return_value = True + duo_factor = MFA_CHALLENGE_RESPONSE_DUO_AUTH['_embedded']['factors'] - with self.assertRaises(okta.PasscodeRequired): - client.handle_mfa_response(MFA_CHALLENGE_RESPONSE_PASSCODE) + ret = client.handle_push_factors(duo_factor, 'token') + + self.assertEqual(ret, True) + client.duo_auth.assert_has_calls([ + mock.call('abcd', 'token') + ]) def test_mfa_wait_loop_success(self): client = okta.Okta('organization', 'username', 'password') @@ -472,6 +642,71 @@ def test_mfa_wait_loop_user_cancel(self): ret = client.mfa_wait_loop(MFA_WAITING_RESPONSE, data, sleep=0) self.assertEquals(ret, None) + def test_handle_response_factors_none(self): + client = okta.Okta('organization', 'username', 'password') + ret = client.handle_response_factors([], 'foo') + self.assertEqual(ret, None) + + def test_handle_response_factors_sms(self): + client = okta.Okta('organization', 'username', 'password') + client.request_otp = mock.MagicMock() + with self.assertRaises(okta.PasscodeRequired): + client.handle_response_factors( + MFA_CHALLENGE_SMS_OTP['_embedded']['factors'], + 'foo') + client.request_otp.assert_has_calls([mock.call('abcd', 'foo', 'SMS')]) + + def test_handle_response_factors_call(self): + client = okta.Okta('organization', 'username', 'password') + client.request_otp = mock.MagicMock() + with self.assertRaises(okta.PasscodeRequired): + client.handle_response_factors( + MFA_CHALLENGE_CALL_OTP['_embedded']['factors'], + 'foo') + client.request_otp.assert_has_calls([ + mock.call('abcd', 'foo', 'phone call') + ]) + + def test_handle_response_factors_question(self): + client = okta.Okta('organization', 'username', 'password') + client.request_otp = mock.MagicMock() + with self.assertRaises(okta.AnswerRequired): + client.handle_response_factors( + MFA_CHALLENGE_QUESTION['_embedded']['factors'], + 'foo') + + def test_handle_response_factors_google(self): + client = okta.Okta('organization', 'username', 'password') + client.request_otp = mock.MagicMock() + with self.assertRaises(okta.PasscodeRequired): + client.handle_response_factors( + MFA_CHALLENGE_GOOGLE_OTP['_embedded']['factors'], + 'foo') + + def test_handle_response_factors_okta(self): + client = okta.Okta('organization', 'username', 'password') + client.request_otp = mock.MagicMock() + with self.assertRaises(okta.PasscodeRequired): + client.handle_response_factors( + MFA_CHALLENGE_OKTA_OTP['_embedded']['factors'], + 'foo') + + def test_request_otp(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='_request') + client._request.side_effect = [ + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_TIMEOUT_RESPONSE, + ] + + client.request_otp('foo', 'bar', 'sms') + client._request.assert_has_calls([ + mock.call('/authn/factors/foo/verify', + {'fid': 'foo', 'stateToken': 'bar'}) + ]) + class PasscodeRequiredTest(unittest.TestCase): def test_class_properties(self): @@ -484,3 +719,15 @@ def test_class_properties(self): self.assertEquals(error_response.fid, 'fid') self.assertEquals(error_response.state_token, 'state_token') self.assertEquals(error_response.provider, 'provider') + + +class AnswerRequiredTest(unittest.TestCase): + def test_class_properties(self): + error_response = None + try: + raise okta.AnswerRequired('factor', 'state_token') + except okta.AnswerRequired as err: + error_response = err + + self.assertEquals(error_response.factor, 'factor') + self.assertEquals(error_response.state_token, 'state_token')