Skip to content

Commit

Permalink
Merge pull request #12 from nathan-v/Feature_additional_factor_support
Browse files Browse the repository at this point in the history
Support more MFA
  • Loading branch information
nathan-v authored May 2, 2018
2 parents 5fd500f + 1a56f6c commit a40e5c4
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 108 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ successful the user may close the browser or tab.
* Duo Auth
* Okta OTP
* Google Auth OTP
* SMS OTP
* Call OTP
* Question/Answer

Windows Hello, U2F, email, and physical token (RSA, Symantec) are not supported
at this time.

## Multiple AWS Roles

Expand Down
34 changes: 25 additions & 9 deletions aws_okta_keyman/keyman.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rainbow_logging_handler
import requests

from aws_okta_keyman import aws, okta
from aws_okta_keyman import aws, okta, okta_saml
from aws_okta_keyman.config import Config
from aws_okta_keyman.metadata import __desc__, __version__

Expand Down Expand Up @@ -123,13 +123,14 @@ def init_okta(self, password):
"""
try:
if self.config.oktapreview is True:
self.okta_client = okta.OktaSaml(self.config.org,
self.config.username,
password, oktapreview=True)
self.okta_client = okta_saml.OktaSaml(self.config.org,
self.config.username,
password,
oktapreview=True)
else:
self.okta_client = okta.OktaSaml(self.config.org,
self.config.username,
password)
self.okta_client = okta_saml.OktaSaml(self.config.org,
self.config.username,
password)

except okta.EmptyInput:
self.log.fatal('Cannot enter a blank string for any input')
Expand All @@ -156,6 +157,17 @@ def auth_okta(self):
verified = self.okta_client.validate_mfa(err.fid,
err.state_token,
passcode)
except okta.AnswerRequired as err:
self.log.warning('Question/Answer MFA response required.')
self.log.warning("{}".format(
err.factor['profile']['questionText'])
)
verified = False
while not verified:
answer = self.user_input('Answer: ')
verified = self.okta_client.validate_answer(err.factor['id'],
err.state_token,
answer)
except okta.UnknownError as err:
self.log.fatal("Fatal error: {}".format(err))
sys.exit(1)
Expand All @@ -173,8 +185,12 @@ def handle_multiple_roles(self, session):

def start_session(self):
"""Initialize AWS session object."""
assertion = self.okta_client.get_assertion(appid=self.config.appid,
apptype='amazon_aws')
try:
assertion = self.okta_client.get_assertion(appid=self.config.appid,
apptype='amazon_aws')
except okta.UnknownError:
sys.exit(1)

return aws.Session(assertion, profile=self.config.name)

def aws_auth_loop(self):
Expand Down
2 changes: 1 addition & 1 deletion aws_okta_keyman/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# Copyright 2018 Nathan V
"""Package metadata."""

__version__ = '0.3.3'
__version__ = '0.4.0'
__desc__ = 'AWS Okta Keyman'
__desc_long__ = ('''
===============
Expand Down
184 changes: 127 additions & 57 deletions aws_okta_keyman/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""This contains all of the Okta-specific code we need."""
"""This contains the Okta client code."""
from __future__ import unicode_literals
import base64
import logging
import time
from multiprocessing import Process
import sys
import webbrowser

import bs4
import requests

from aws_okta_keyman.duo import Duo
Expand Down Expand Up @@ -65,6 +63,15 @@ def __init__(self, fid, state_token, provider):
super(PasscodeRequired, self).__init__()


class AnswerRequired(BaseException):
"""A 2FA Passcode Must Be Entered."""

def __init__(self, factor, state_token):
self.factor = factor
self.state_token = state_token
super(AnswerRequired, self).__init__()


class OktaVerifyRequired(BaseException):
"""OktaVerify Authentication Is Required."""

Expand Down Expand Up @@ -160,24 +167,61 @@ def validate_mfa(self, fid, state_token, passcode):
Returns:
True/False whether or not authentication was successful
"""
if len(passcode) != 6:
LOG.error('Passcodes must be 6 digits')
if len(passcode) > 6 or len(passcode) < 5:
LOG.error('Passcodes must be 5 or 6 digits')
return False

valid = self.send_user_response(fid, state_token, passcode, 'passCode')
if valid:
self.set_token(valid)
return True
else:
return False

def validate_answer(self, fid, state_token, answer):
"""Validate an Okta user with Question-based MFA.
Takes in the supplied Factor ID (fid), State Token and user supplied
Passcode, and validates the auth. If successful, sets the session
token. If invalid, raises an exception.
Args:
fid: Okta Factor ID (returned in the PasscodeRequired exception)
state_token: State Tken (returned in the PasscodeRequired
exception)
answer: The user-supplied answer to verify
Returns:
True/False whether or not authentication was successful
"""
if len(answer) == 0:
LOG.error('Answer cannot be blank')
return False

valid = self.send_user_response(fid, state_token, answer, 'answer')
if valid:
self.set_token(valid)
return True
else:
return False

def send_user_response(self, fid, state_token, user_response, resp_type):
"""Call Okta with a factor response and verify it."""
path = '/authn/factors/{fid}/verify'.format(fid=fid)
data = {'fid': fid,
'stateToken': state_token,
'passCode': passcode}
resp_type: user_response}
try:
ret = self._request(path, data)
return self._request(path, data)
except requests.exceptions.HTTPError as err:
if err.response.status_code == 403:
LOG.error('Invalid Passcode Detected')
return False
raise UnknownError(err.response.body)
if err.response.status_code == 401:
LOG.error('Invalid Passcode Retries Exceeded')
raise UnknownError('Retries exceeded')

self.set_token(ret)
return True
raise UnknownError(err.response.body)

def okta_verify(self, fid, state_token):
"""Trigger an Okta Push Verification and waits.
Expand Down Expand Up @@ -309,61 +353,87 @@ def handle_mfa_response(self, ret):
"""In the case of an MFA response evaluate the response and handle
accordingly based on available MFA factors.
"""
otp_possible = False
otp_provider = None
response_types = ['sms', 'question', 'call', 'token:software:totp']
push_factors = []
response_factors = []
for factor in ret['_embedded']['factors']:
if factor['factorType'] == 'push':
if self.okta_verify(factor['id'], ret['stateToken']):
LOG.debug('Okta Verify factor found')
push_factors.append(factor)
if factor['provider'] == 'DUO':
LOG.debug('Duo Auth factor found')
push_factors.append(factor)
if factor['factorType'] in response_types:
LOG.debug("{} factor found".format(factor['factorType']))
response_factors.append(factor)

if self.handle_push_factors(push_factors, ret['stateToken']):
return True

self.handle_response_factors(response_factors, ret['stateToken'])

# If we haven't returned or raised yet the factor requested isn't
# supported
LOG.debug("Factors from Okta: {}".format(
ret['_embedded']['factors']))
LOG.fatal('MFA type in use is unsupported')
raise UnknownError('MFA type in use is unsupported')

def handle_push_factors(self, factors, state_token):
"""Handle any push-type factors."""
for factor in factors:
if factor['factorType'] == 'push':
LOG.debug('Okta Verify factor found')
if self.okta_verify(factor['id'], state_token):
return True
if factor['provider'] == 'DUO':
if self.duo_auth(factor['id'], ret['stateToken']):
LOG.debug('Duo Auth factor found')
if self.duo_auth(factor['id'], state_token):
return True
return False

def handle_response_factors(self, factors, state_token):
"""Handle any OTP-type factors."""
otp_provider = None
otp_factor = None
for factor in factors:
if factor['factorType'] == 'sms':
self.request_otp(factor['id'], state_token, 'SMS')
phone = factor['profile']['phoneNumber']
otp_provider = "SMS ({})".format(phone)
otp_factor = factor['id']
break
if factor['factorType'] == 'call':
self.request_otp(factor['id'], state_token, 'phone call')
phone = factor['profile']['phoneNumber']
otp_provider = "call ({})".format(phone)
otp_factor = factor['id']
break
if factor['factorType'] == 'question':
raise AnswerRequired(factor, state_token)
if factor['factorType'] == 'token:software:totp':
# Handle OTP separately in case we can do Okta or Duo but fail
# then we can fall back to OTP
LOG.debug('Software OTP option found')
otp_provider = factor['provider']
otp_possible = True
otp_factor = factor['id']

if otp_possible:
if otp_provider:
raise PasscodeRequired(
fid=factor['id'],
state_token=ret['stateToken'],
fid=otp_factor,
state_token=state_token,
provider=otp_provider)
else:
# Log out the factors to make debugging MFA issues easier
LOG.debug("Factors from Okta: {}".format(
ret['_embedded']['factors']))
LOG.fatal('MFA type in use is unsupported')
raise UnknownError('MFA type in use is unsupported')


class OktaSaml(Okta):
"""Handle the SAML part of talking to Okta."""

def assertion(self, saml):
"""Parse the assertion from the SAML response."""
assertion = ''
soup = bs4.BeautifulSoup(saml, 'html.parser')
for inputtag in soup.find_all('input'):
if inputtag.get('name') == 'SAMLResponse':
assertion = inputtag.get('value')
return base64.b64decode(assertion)

def get_assertion(self, appid, apptype):
"""Call Okta and get the assertion."""
path = '{url}/home/{apptype}/{appid}'.format(
url=self.base_url, apptype=apptype, appid=appid)
resp = self.session.get(path,
params={'onetimetoken': self.session_token})
LOG.debug(resp.__dict__)

try:
resp.raise_for_status()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
LOG.error('Unknown error: {msg}'.format(
msg=str(err.response.__dict__)))
raise UnknownError()

return self.assertion(resp.text)
def request_otp(self, fid, state_token, otp_type):
"""Trigger an OTP call, SMS, or other and return
We trigger the push, and then immediately return as the next step is
essentially just an OTP code entry
Args:
fid: Okta Factor ID used to trigger the push
state_token: State Token allowing us to trigger the push
otp_type: String shown in log for OTP type
"""
LOG.warning("Okta {} being requested...".format(otp_type))
path = '/authn/factors/{fid}/verify'.format(fid=fid)
data = {'fid': fid,
'stateToken': state_token}
self._request(path, data)
65 changes: 65 additions & 0 deletions aws_okta_keyman/okta_saml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
#
# Credits: Portions of this code were copied/modified from
# https://github.com/ThoughtWorksInc/oktaauth
#
# Copyright (c) 2015, Peter Gillard-Moss
# All rights reserved.

# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.

# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""This contains all of the Okta SML specific code."""
from __future__ import unicode_literals
import base64
import logging

import bs4
import requests

from aws_okta_keyman import okta


LOG = logging.getLogger(__name__)


class OktaSaml(okta.Okta):
"""Handle the SAML part of talking to Okta."""

def assertion(self, saml):
"""Parse the assertion from the SAML response."""
assertion = ''
soup = bs4.BeautifulSoup(saml, 'html.parser')
for inputtag in soup.find_all('input'):
if inputtag.get('name') == 'SAMLResponse':
assertion = inputtag.get('value')
return base64.b64decode(assertion)

def get_assertion(self, appid, apptype):
"""Call Okta and get the assertion."""
path = '{url}/home/{apptype}/{appid}'.format(
url=self.base_url, apptype=apptype, appid=appid)
resp = self.session.get(path,
params={'onetimetoken': self.session_token})
LOG.debug(resp.__dict__)

try:
resp.raise_for_status()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
if err.response.status_code == 404:
LOG.fatal("Provided App ID {} not found".format(appid))
else:
LOG.error('Unknown error: {msg}'.format(
msg=str(err.response.__dict__)))
raise okta.UnknownError()

return self.assertion(resp.text)
Loading

0 comments on commit a40e5c4

Please sign in to comment.