Skip to content

Commit

Permalink
Let user set hashing algo and iterations, fix #112
Browse files Browse the repository at this point in the history
Cleanup tests
  • Loading branch information
Federico Ceratto committed Feb 9, 2017
1 parent fbea609 commit ebb2777
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 260 deletions.
89 changes: 72 additions & 17 deletions cork/cork.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class BaseCork(object):

def __init__(self, directory=None, backend=None, email_sender=None,
initialize=False, session_domain=None, smtp_server=None,
smtp_url='localhost', session_key_name=None):
smtp_url='localhost', session_key_name=None,
preferred_hashing_algorithm=None, pbkdf2_iterations=None):
"""Auth/Authorization/Accounting class
:param directory: configuration directory
Expand All @@ -87,14 +88,29 @@ def __init__(self, directory=None, backend=None, email_sender=None,
:param roles_fname: roles filename (without .json), defaults to 'roles'
:type roles_fname: str.
"""
if preferred_hashing_algorithm not in ("argon2", "PBKDF2sha1",
"PBKDF2sha256", "scrypt"):
raise Exception("preferred_hashing_algorithm must be 'argon2',"
" 'PBKDF2sha1', 'PBKDF2sha256' or 'scrypt'")
elif preferred_hashing_algorithm.startswith("PBKDF2") \
and not pbkdf2_iterations:
raise Exception("pbkdf2_iterations must be set")
elif preferred_hashing_algorithm == 'scrypt' and not scrypt_available:
raise Exception("scrypt.hash required."
" Please install the scrypt library.")
elif preferred_hashing_algorithm == 'argon2' and not argon2_available:
raise Exception("argon2 required."
" Please install the argon2 library.")

if smtp_server:
smtp_url = smtp_server
self.mailer = Mailer(email_sender, smtp_url)
self.password_reset_timeout = 3600 * 24
self.session_domain = session_domain
self.session_key_name = session_key_name or 'beaker.session'
self.preferred_hashing_algorithm = 'PBKDF2'
self.saltlength = { 'PBKDF2':32, 'scrypt':32, 'argon2':57 }
self.preferred_hashing_algorithm = preferred_hashing_algorithm
self.pbkdf2_iterations = pbkdf2_iterations

# Setup JsonBackend by default for backward compatibility.
if backend is None:
Expand Down Expand Up @@ -621,18 +637,20 @@ def _hash(self, username, pwd, salt=None, algo=None):
if algo is None:
algo = self.preferred_hashing_algorithm

if algo == 'PBKDF2':
return self._hash_pbkdf2(self, username, pwd, salt=salt)
if algo == 'PBKDF2sha1':
return self._hash_pbkdf2_sha1(username, pwd, salt=salt)

if algo == 'PBKDF2sha256':
return self._hash_pbkdf2_sha256(username, pwd, salt=salt)

if algo == 'scrypt':
return self._hash_scrypt(self, username, pwd, salt=salt)
return self._hash_scrypt(username, pwd, salt=salt)

if algo == 'argon2':
return self._hash_argon2(self, username, pwd, salt=salt)
return self._hash_argon2(username, pwd, salt=salt)

raise RuntimeError("Unknown hashing algorithm requested: %s" % algo)

@staticmethod
def _hash_scrypt(self, username, pwd, salt=None):
"""Hash username and password, generating salt value if required
Use scrypt.
Expand All @@ -648,17 +666,22 @@ def _hash_scrypt(self, username, pwd, salt=None):

assert len(salt) == self.saltlength['scrypt'], "Incorrect salt length"

username = username.encode('utf-8')
assert isinstance(username, bytes)

pwd = pwd.encode('utf-8')
assert isinstance(pwd, bytes)

cleartext = "%s\0%s" % (username, pwd)
h = scrypt.hash(cleartext, salt)

# 's' for scrypt
hashed = b's' + salt + h
return b64encode(hashed)

@staticmethod
def _hash_pbkdf2(self, username, pwd, salt=None):
def _hash_pbkdf2_sha1(self, username, pwd, salt=None):
"""Hash username and password, generating salt value if required
Use PBKDF2 from Beaker
Use PBKDF2 with sha1
:returns: base-64 encoded str.
"""
Expand All @@ -675,13 +698,39 @@ def _hash_pbkdf2(self, username, pwd, salt=None):
assert isinstance(pwd, bytes)

cleartext = username + b'\0' + pwd
h = hashlib.pbkdf2_hmac('sha1', cleartext, salt, 10, dklen=32)
h = hashlib.pbkdf2_hmac('sha1', cleartext, salt,
self.pbkdf2_iterations, dklen=32)

# 'p' for PBKDF2
# 'p' for PBKDF2 with sha1
hashed = b'p' + salt + h
return b64encode(hashed)

@staticmethod
def _hash_pbkdf2_sha256(self, username, pwd, salt=None):
"""Hash username and password, generating salt value if required
Use PBKDF2 with sha256
:returns: base-64 encoded str.
"""
if salt is None:
salt = os.urandom(self.saltlength['PBKDF2'])

assert isinstance(salt, bytes)
assert len(salt) == self.saltlength['PBKDF2'], "Incorrect salt length"

username = username.encode('utf-8')
assert isinstance(username, bytes)

pwd = pwd.encode('utf-8')
assert isinstance(pwd, bytes)

cleartext = username + b'\0' + pwd
h = hashlib.pbkdf2_hmac('sha256', cleartext, salt,
self.pbkdf2_iterations, dklen=32)

# 'k' for PBKDF2 with sha256
hashed = b'k' + salt + h
return b64encode(hashed)

def _hash_argon2(self, username, pwd, salt=None):
"""Hash username and password, generating salt value if required
Use argon2
Expand Down Expand Up @@ -720,22 +769,28 @@ def _verify_password(self, username, pwd, salted_hash):
if isinstance(hash_type, int):
hash_type = chr(hash_type)

if hash_type == 'p': # PBKDF2
if hash_type == 'p': # PBKDF2 with sha1
saltend = self.saltlength['PBKDF2']+1
salt = decoded[1:saltend]
h = self._hash_pbkdf2_sha1(username, pwd, salt)
return salted_hash == h

if hash_type == 'k': # PBKDF2 with sha256
saltend = self.saltlength['PBKDF2']+1
salt = decoded[1:saltend]
h = self._hash_pbkdf2(self, username, pwd, salt)
h = self._hash_pbkdf2_sha256(username, pwd, salt)
return salted_hash == h

if hash_type == 's': # scrypt
saltend = self.saltlength['scrypt']+1
salt = decoded[1:saltend]
h = self._hash_scrypt(self, username, pwd, salt)
h = self._hash_scrypt(username, pwd, salt)
return salted_hash == h

if hash_type == 'a': # argon2
saltend = self.saltlength['argon2']+1
salt = decoded[1:saltend]
h = self._hash_argon2(self, username, pwd, salt)
h = self._hash_argon2(username, pwd, salt)
return salted_hash == h

raise RuntimeError("Unknown hashing algorithm in hash: %r" % decoded)
Expand Down
21 changes: 0 additions & 21 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,6 @@ def mytmpdir(tmpdir):
tmpdir.join('password_reset_email.tpl').write("""Username:{{username}} Email:{{email_addr}} Code:{{reset_code}}""")
return tmpdir

# used by test_scrypt.py
@pytest.fixture
def aaa(mytmpdir):
aaa = Cork(mytmpdir, smtp_server='localhost', email_sender='test@localhost')
return aaa




# mb = self.setup_test_db()
# self.aaa = MockedUnauthenticatedCork(backend=mb,
# smtp_server='localhost', email_sender='test@localhost')
# cookie_name = None
# if hasattr(self, 'purge_test_db'):
# self.purge_test_db()
#
# del(self.aaa)
# cookie_name = None



def assert_is_redirect(e, path):
"""Check if an HTTPResponse is a redirect.
Expand Down
75 changes: 8 additions & 67 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ def json_db_dir(tmpdir, templates_dir):
@pytest.fixture
def aaa(json_db_dir):
"""Setup a MockedAdminCork instance"""
aaa = MockedAdminCork(json_db_dir.strpath, smtp_server='localhost', email_sender='test@localhost')
aaa = MockedAdminCork(json_db_dir.strpath, smtp_server='localhost', email_sender='test@localhost',
preferred_hashing_algorithm='PBKDF2sha1', pbkdf2_iterations=100000)
aaa.mailer.use_threads = False
return aaa


@pytest.fixture
def aaa_unauth(json_db_dir):
"""Setup test directory and a MockedAdminCork instance"""
aaa = MockedUnauthenticatedCork(json_db_dir.strpath)
aaa = MockedUnauthenticatedCork(json_db_dir.strpath, preferred_hashing_algorithm='PBKDF2sha1', pbkdf2_iterations=100000)
aaa.mailer.use_threads = False
return aaa

Expand Down Expand Up @@ -101,12 +102,12 @@ def wrapper(*a, **kw):
# Tests

def test_init(json_db_dir):
Cork(json_db_dir.strpath)
Cork(json_db_dir.strpath, preferred_hashing_algorithm='PBKDF2sha1', pbkdf2_iterations=100000)


def test_initialize_storage(json_db_dir):
jb = JsonBackend(json_db_dir.strpath, initialize=True)
Cork(backend=jb)
Cork(backend=jb, preferred_hashing_algorithm='PBKDF2sha1', pbkdf2_iterations=100000)
assert json_db_dir.join('users.json').read() == '{}'
assert json_db_dir.join('roles.json').read() == '{}'
assert json_db_dir.join('register.json').read() == '{}'
Expand All @@ -122,7 +123,7 @@ def test_initialize_storage(json_db_dir):
def test_unable_to_save(json_db_dir):
bogus_dir = '/___inexisting_directory___'
with pytest.raises(BackendIOException):
Cork(bogus_dir, initialize=True)
Cork(bogus_dir, initialize=True, preferred_hashing_algorithm='PBKDF2sha1', pbkdf2_iterations=100000)


def test_loadjson_missing_file(aaa):
Expand All @@ -143,68 +144,8 @@ def test_loadjson_unchanged(aaa):
assert mtimes == aaa._store._mtimes


# Test PBKDF2-based password hashing

def test_password_hashing_PBKDF2(aaa):
shash = aaa._hash(u'user_foo', u'bogus_pwd')
assert isinstance(shash, bytes)
assert len(shash) == 88, "hash length should be 88 and is %d" % len(shash)
assert shash.endswith(b'='), "hash should end with '='"
assert aaa._verify_password('user_foo', 'bogus_pwd', shash) == True, \
"Hashing verification should succeed"


def test_hashlib_pbk():
# Hashlib works under py2 and py3 producing the same output.
# With iterations = 10 and dklen = 32 the output is also consistent with
# beaker under py2 as in the previous versions of Cork
import hashlib
cleartext = b'hello'
salt = b'hi'
h = hashlib.pbkdf2_hmac('sha1', cleartext, salt, 10, dklen=32)
assert b64encode(h) == b'QTH8vcCFLLqLhxCTnkz6sq+Un3B4RQgWjMPpRC9hfEY='

def test_password_hashing_PBKDF2_known_hash(aaa):
assert aaa.preferred_hashing_algorithm == 'PBKDF2'
salt = b's' * 32
shash = aaa._hash(u'user_foo', u'bogus_pwd', salt=salt)
assert shash == b'cHNzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzax44AxQgK6uD9q1YWxLos1ispCe1Z7T7pOFK1PwdWEs='

def test_password_hashing_PBKDF2_known_hash_2(aaa):
assert aaa.preferred_hashing_algorithm == 'PBKDF2'
salt = b'\0' * 32
shash = aaa._hash(u'user_foo', u'bogus_pwd', salt=salt)
assert shash == b'cAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA/8Uh4pyEOHoRz4j0lDzAmqb7Dvmo8GpeXwiKTDsuYFw='


def test_password_hashing_PBKDF2_known_hash_3(aaa):
assert aaa.preferred_hashing_algorithm == 'PBKDF2'
salt = b'x' * 32
shash = aaa._hash(u'user_foo', u'bogus_pwd', salt=salt)
assert shash == b'cHh4eHh4eHh4eHh4eHh4eHh4eHh4eHh4eHh4eHh4eHh4MEaIU5Op97lmvwX5NpVSTBP8jg8OlrN7c2K8K8tnNks='


def test_password_hashing_PBKDF2_incorrect_hash_len(aaa):
salt = b'x' * 31 # Incorrect length
with pytest.raises(AssertionError):
shash = aaa._hash(u'user_foo', u'bogus_pwd', salt=salt)


def test_password_hashing_PBKDF2_incorrect_hash_value(aaa):
shash = aaa._hash(u'user_foo', u'bogus_pwd')
assert len(shash) == 88, "hash length should be 88 and is %d" % len(shash)
assert shash.endswith(b'='), "hash should end with '='"
assert aaa._verify_password(u'user_foo', u'####', shash) == False, \
"Hashing verification should fail"
assert aaa._verify_password(u'###', u'bogus_pwd', shash) == False, \
"Hashing verification should fail"


def test_password_hashing_PBKDF2_collision(aaa):
salt = b'S' * 32
hash1 = aaa._hash(u'user_foo', u'bogus_pwd', salt=salt)
hash2 = aaa._hash(u'user_foobogus', u'_pwd', salt=salt)
assert hash1 != hash2, "Hash collision"




# Test password hashing for inexistent algorithms
Expand Down
Loading

0 comments on commit ebb2777

Please sign in to comment.