Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Onyx 26897 excluded token save part , Pls note -" It is not to review " #180

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ansible-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- stable-2.10
- stable-2.11
- stable-2.12
- devel
# - devel
python:
- 3.9
runs-on: ubuntu-latest
Expand Down
62 changes: 40 additions & 22 deletions plugins/lookup/conjur_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
from ansible.module_utils.urls import open_url
from ansible.utils.display import Display
import ssl
from pathlib import Path

display = Display()

Expand All @@ -128,7 +129,7 @@ def _load_identity_from_file(identity_path, appliance_url):
if not os.path.exists(identity_path):
return {}
# raise AnsibleError('Conjur identity file `{0}` was not found on the controlling host'
# .format(identity_path))
# .format(identity_path))

display.vvvv('Loading identity from: {0} for {1}'.format(identity_path, appliance_url))

Expand Down Expand Up @@ -163,26 +164,6 @@ def _encode_str(input_str):
return quote(input_str, safe='')


# Use credentials to retrieve temporary authorization token
def _fetch_conjur_token(conjur_url, account, username, api_key, validate_certs, cert_file):
conjur_url = '{0}/authn/{1}/{2}/authenticate'.format(conjur_url, account, _encode_str(username))
display.vvvv('Authentication request to Conjur at: {0}, with user: {1}'.format(
conjur_url,
_encode_str(username)))

response = open_url(conjur_url,
data=api_key,
method='POST',
validate_certs=validate_certs,
ca_path=cert_file)
code = response.getcode()
if code != 200:
raise AnsibleError('Failed to authenticate as \'{0}\' (got {1} response)'
.format(username, code))

return response.read()


def retry(retries, retry_interval):
"""
Custom retry decorator
Expand Down Expand Up @@ -212,6 +193,44 @@ def decorator(*args, **kwargs):
return parameters_wrapper


# Use credentials to retrieve temporary authorization token
def _fetch_conjur_token(conjur_url, account, username, api_key, validate_certs, cert_file):
conjur_url = '{0}/authn/{1}/{2}/authenticate'.format(conjur_url, account, _encode_str(username))
display.vvvv('Authentication request to Conjur at: {0}, with user: {1}'.format(
conjur_url,
_encode_str(username)))

response = open_url(conjur_url,
data=api_key,
method='POST',
validate_certs=validate_certs,
ca_path=cert_file)
code = response.getcode()
if response.getcode() == 200:
display.vvvv('Conjur token was successfully retrieved and authorized with {0} code and {1} username '.format(code, username))
return response.read()
if response.getcode() == 401:
raise AnsibleError('Conjur request has invalid authorization credentials as {0} and {1} response'.format(code, username))
if response.getcode() == 403:
raise AnsibleError('The controlling host\'s Conjur identity does not have authorization as \'{0}\' (got {1} response)'
.format(username, code))
if response.getcode() == 404:
raise AnsibleError('The token does not exist with {0} response '.format(code))
if response.getcode() == 500:
raise AnsibleError('Internal Server Error with {0} response'.format(code))

return response.read()


@retry(retries=5, retry_interval=10)
def _open_url(conjur_url, api_key=None, method=None, validate_certs=True, cert_file=None):
return open_url(conjur_url,
data=api_key,
method=method,
validate_certs=validate_certs,
ca_path=cert_file)


@retry(retries=5, retry_interval=10)
def _repeat_open_url(url, headers=None, method=None, validate_certs=True, ca_path=None):
return open_url(url,
Expand Down Expand Up @@ -351,7 +370,6 @@ def run(self, terms, variables=None, **kwargs):
.format(conf['authn_token_file']))
with open(conf['authn_token_file'], 'rb') as f:
token = f.read()

conjur_variable = _fetch_conjur_variable(
terms[0],
token,
Expand Down
32 changes: 30 additions & 2 deletions tests/unit/plugins/lookup/test_conjur_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,34 @@ def test_fetch_conjur_token(self, mock_open_url):
ca_path="cert_file")
self.assertEquals("response body", result)

@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable.open_url')
def test_fetch_conjur_token_401(self, mock_open_url):
mock_response = MagicMock()
mock_response.getcode.return_value == 401
mock_response.read.return_value = "Conjur request has invalid authorization credentials"
mock_open_url.return_value = mock_response
result = _fetch_conjur_token("url", "account", "username1", "api_key", True, "cert_file")
mock_open_url.assert_called_with("url/authn/account/username1/authenticate",
data="api_key",
method="POST",
validate_certs=True,
ca_path="cert_file")
self.assertEquals("Conjur request has invalid authorization credentials", result)

@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable.open_url')
def test_fetch_conjur_token_500(self, mock_open_url):
mock_response = MagicMock()
mock_response.getcode.return_value == 500
mock_response.read.return_value = "Internal Server Error"
mock_open_url.return_value = mock_response
result = _fetch_conjur_token("url", "account", "username1", "api_key", True, "cert_file")
mock_open_url.assert_called_with("url/authn/account/username1/authenticate",
data="api_key",
method="POST",
validate_certs=True,
ca_path="cert_file")
self.assertEquals("Internal Server Error", result)

@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable._repeat_open_url')
def test_fetch_conjur_variable(self, mock_repeat_open_url):
mock_response = MagicMock()
Expand All @@ -69,7 +97,7 @@ def test_fetch_conjur_variable(self, mock_repeat_open_url):
@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable._fetch_conjur_token')
@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable._merge_dictionaries')
def test_run(self, mock_merge_dictionaries, mock_fetch_conjur_token, mock_fetch_conjur_variable):
mock_fetch_conjur_token.return_value = "token"
mock_fetch_conjur_token.return_value = b'token'
mock_fetch_conjur_variable.return_value = ["conjur_variable"]
mock_merge_dictionaries.side_effect = [
{'account': 'fakeaccount', 'appliance_url': 'https://conjur-fake', 'cert_file': './conjurfake.pem'},
Expand All @@ -86,7 +114,7 @@ def test_run(self, mock_merge_dictionaries, mock_fetch_conjur_token, mock_fetch_
@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable._fetch_conjur_token')
@patch('ansible_collections.cyberark.conjur.plugins.lookup.conjur_variable._merge_dictionaries')
def test_retrieve_to_file(self, mock_merge_dictionaries, mock_fetch_conjur_token, mock_fetch_conjur_variable):
mock_fetch_conjur_token.return_value = "token"
mock_fetch_conjur_token.return_value = b'token'
mock_fetch_conjur_variable.return_value = ["conjur_variable"]
mock_merge_dictionaries.side_effect = [
{'account': 'fakeaccount', 'appliance_url': 'https://conjur-fake', 'cert_file': './conjurfake.pem'},
Expand Down