Skip to content

Commit

Permalink
Merge pull request #176 from patrikspiess/Traceback-on-vault-certific…
Browse files Browse the repository at this point in the history
…ate-error-#174

Traceback on vault certificate error #174
  • Loading branch information
lucmurer authored Dec 11, 2023
2 parents 9573f03 + 551bb5f commit a9ce959
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: tests
name: Code Tests 🚨
run-name: testing workflow invoked by ${{ github.actor }}

on: [push, pull_request]
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ For unreleased changes see [WHATSNEW.md](WHATSNEW.md)

# [Released]

## [2.0.1] - 2023-11-29

### Added

- Option ssl_verify for Vault service

## [2.0.0] - 2023-11-28

### Added
Expand Down
6 changes: 3 additions & 3 deletions WHATSNEW.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Added

### Changed
## [2.0.1] - 2023-11-29

### Removed
### Added

- Option ssl_verify for Vault service
6 changes: 6 additions & 0 deletions docs/source/usage/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ Instead of storing the secret_id in the yaml configuration file you may use the
variable ``FOTOOBO_VAULT_SECRET_ID`` to store that value. The environment variable takes precedence
over the configuration file should both be defined.

ssl_verify (optional, default: True)
""""""""""""""""""""""""""""""""""""

Check host SSL certificate (true) or not (false). You can also provide a path to a custom CA
certificate or CA bundle. Please be aware that disabling SSL certificate verification is a security
risk and should not be used in a production environment.

token_file (optional)
"""""""""""""""""""""
Expand Down
5 changes: 3 additions & 2 deletions fotoobo.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ logging:
# Instead of storing credentials in the inventory file you may use VAULT as a placeholder. All asset
# attributes that are VAULT will be retreived from the Hashicorp Vault service specified here.
# The Hashicorp Vault service will use approle login to get the data.
# This section is optional and may pose a security issue. In future versions we may also read the
# role_id and secret_id from the system environment.
# We may also read the role_id and secret_id from the system environment. This makes it a little bit
# more secure.
vault:
url: https://vault.local
ssl_verify: false
namespace: vault_namespace
data_path: /v1/kv/data/fotoobo
role_id: ...
Expand Down
2 changes: 1 addition & 1 deletion fotoobo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
from .fortinet import FortiAnalyzer, FortiClientEMS, FortiGate, FortiManager

__all__ = ["FortiAnalyzer", "FortiClientEMS", "FortiGate", "FortiManager"]
__version__: str = "2.0.0"
__version__: str = "2.0.1"
11 changes: 5 additions & 6 deletions fotoobo/fortinet/fortinet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import logging
from abc import ABC, abstractmethod
from time import time
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

import requests
import urllib3

from fotoobo.exceptions import APIError, GeneralError
from fotoobo.helpers.config import config

log = logging.getLogger("fotoobo")

Expand Down Expand Up @@ -41,7 +40,7 @@ def __init__(self, hostname: str, **kwargs: Any) -> None:
If you need to connect to your Fortinet device through a proxy server you can
set it here as as string. If needed you may append the proxy server port with a
column to the proxy server. e.g. "proxy.local:8000".
ssl_verify (bool): enable/disable SSL certificate check
ssl_verify (bool | str): enable/disable SSL certificate check
When ssl_verify is enabled you have to install a trusted SSL certificate onto
the device you wish to connect to. If you set ssl_verify to false it will also
disable the warnings in urllib3. This prevents unwanted SSL warnings to be
Expand All @@ -58,9 +57,9 @@ def __init__(self, hostname: str, **kwargs: Any) -> None:
if proxy := kwargs.get("proxy", ""):
self.session.proxies = {"http": f"{proxy}", "https": f"{proxy}"}

self.ssl_verify: bool = kwargs.get("ssl_verify", config.ssl_verify)
self.ssl_verify: Union[bool, str] = kwargs.get("ssl_verify", True)
if not self.ssl_verify:
urllib3.disable_warnings()
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)

self.timeout = kwargs.get("timeout", 3)
self.type: str = ""
Expand Down Expand Up @@ -156,7 +155,7 @@ def api( # pylint: disable=too-many-arguments

try:
response.raise_for_status()
except (requests.exceptions.HTTPError) as err:
except requests.exceptions.HTTPError as err:
raise APIError(err) from err

return response
Expand Down
2 changes: 0 additions & 2 deletions fotoobo/helpers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class Config:
audit_logging: Union[Dict[str, Any], None] = None
no_logo: bool = False
cli_info: Dict[str, Any] = field(default_factory=dict)
ssl_verify: Union[str, bool] = True
vault: Dict[str, str] = field(default_factory=dict)

def load_configuration(self, config_file: Union[Path, None]) -> None:
Expand Down Expand Up @@ -77,7 +76,6 @@ def load_configuration(self, config_file: Union[Path, None]) -> None:
self.logging = loaded_config.get("logging", {})
self.audit_logging = loaded_config.get("audit_logging", {})
self.no_logo = loaded_config.get("no_logo", self.no_logo)
self.ssl_verify = loaded_config.get("ssl_verify", self.ssl_verify)

self.vault = loaded_config.get("vault", {})
if self.vault:
Expand Down
63 changes: 42 additions & 21 deletions fotoobo/helpers/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

import logging
from pathlib import Path
from typing import Any, Optional
from typing import Any, Optional, Union

import requests
import urllib3

from fotoobo.exceptions.exceptions import GeneralError

Expand All @@ -31,6 +32,7 @@ def __init__( # pylint: disable=too-many-arguments
data_path: str,
role_id: str,
secret_id: str,
ssl_verify: Union[bool, str] = True,
token_file: Optional[str] = None,
token_ttl_limit: int = 0,
) -> None:
Expand All @@ -42,6 +44,11 @@ def __init__( # pylint: disable=too-many-arguments
data_path: The path where the vault data for fotoobo is stored
role_id: The approle role_id
secret_id: The approle secret_id
ssl_verify: Enable/disable SSL certificate check
When ssl_verify is enabled you have to install a trusted SSL certificate onto
the device you wish to connect to. If you set ssl_verify to false it will also
disable the warnings in urllib3. This prevents unwanted SSL warnings to be
logged.
token_file: The file to store the access token to. If no file is given the token
is not loaded or stored to a file and every execution will issue a
new token.
Expand All @@ -57,10 +64,15 @@ def __init__( # pylint: disable=too-many-arguments
self.token_file: Optional[Path] = None
self.token_ttl_limit: int = token_ttl_limit
self.url: str = url.strip("/")
self.ssl_verify: Union[bool, str] = ssl_verify
if not self.ssl_verify:
log.warning("SSL verify for vault service is disabled which may be a security issue")
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)

log.debug("vault_client_url: '%s'", self.url)
log.debug("vault_client_ssl_verify: '%s'", self.ssl_verify)
log.debug("vault_client_namespace: '%s'", self.namespace)
log.debug("vault_data_path: '%s'", self.data_path)
log.debug("vault_client_data_path: '%s'", self.data_path)
log.debug("vault_client_role_id: '%s...%s'", self.role_id[:4], self.role_id[-5:-1])
log.debug("vault_client_secret_id: '%s...%s'", self.secret_id[:4], self.secret_id[-5:-1])
log.debug("vault_client_token_ttl_limit: '%s'", self.token_ttl_limit)
Expand Down Expand Up @@ -93,7 +105,7 @@ def get_data(self, timeout: int = 3) -> Any:
"X-Vault-Token": self.token,
"X-Vault-Namespace": self.namespace,
}
response = requests.get(url=url, headers=headers, timeout=timeout)
response = requests.get(url=url, headers=headers, timeout=timeout, verify=self.ssl_verify)

if response.ok:
log.debug("Response status_code is '%s'", response.status_code)
Expand All @@ -117,17 +129,19 @@ def get_token(self, timeout: int = 3) -> bool:
url = f"{self.url}/v1/auth/approle/login"
log.debug("Get new token from '%s'", url)
data = {"role_id": self.role_id, "secret_id": self.secret_id}
response = requests.post(url, data=data, timeout=timeout)
log.debug("Response status_code is '%s'", response.status_code)
if response.ok:
self.token = response.json()["auth"]["client_token"]
if self.token_file:
self.save_token()
self.token = ""
try:
response = requests.post(url, data=data, timeout=timeout, verify=self.ssl_verify)
log.debug("Response status_code is '%s'", response.status_code)
if response.ok:
self.token = response.json()["auth"]["client_token"]
if self.token_file:
self.save_token()

else:
self.token = ""
except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
log.error("Request Error: %s", str(err))

return response.ok
return bool(self.token)

def load_token(self) -> bool:
"""Load the token from a file
Expand Down Expand Up @@ -175,17 +189,24 @@ def validate_token(self, timeout: int = 3) -> bool:
url = f"{self.url}/v1/auth/token/lookup-self"
log.debug("Check if vault token still is valid")
headers = {"X-Vault-Token": self.token}
response = requests.get(url=url, headers=headers, timeout=timeout)
log.debug("Response status_code is '%s'", response.status_code)
if response.ok:
log.debug("Vault token is valid for '%s' seconds", response.json()["data"]["ttl"])
log.debug("vault_client_token: '%s...%s'", self.token[:8], self.token[-5:-1])
if response.json()["data"]["ttl"] < self.token_ttl_limit:
log.debug("Invalidate token due to ttl limit")
try:
response = requests.get(
url=url, headers=headers, timeout=timeout, verify=self.ssl_verify
)
log.debug("Response status_code is '%s'", response.status_code)
if response.ok:
log.debug("Vault token is valid for '%s' seconds", response.json()["data"]["ttl"])
log.debug("vault_client_token: '%s...%s'", self.token[:8], self.token[-5:-1])
if response.json()["data"]["ttl"] < self.token_ttl_limit:
log.debug("Invalidate token due to ttl limit")
self.token = ""

else:
log.debug("Token is not valid")
self.token = ""

else:
log.debug("Token is not valid")
except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
self.token = ""
log.error("Request Error: %s", str(err))

return bool(self.token)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "fotoobo"
version = "2.0.0"
version = "2.0.1"
description = "The awesome Fortinet Toolbox"
authors = ["Patrik Spiess <[email protected]>", "Lukas Murer-Jäckle <[email protected]>"]
readme = "README.md"
Expand Down
58 changes: 58 additions & 0 deletions tests/helpers/test_vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest.mock import MagicMock

import pytest
import requests
from _pytest.monkeypatch import MonkeyPatch

from fotoobo.exceptions.exceptions import GeneralError
Expand All @@ -30,6 +31,7 @@ def test_init(token_file: str, monkeypatch: MonkeyPatch) -> None:
monkeypatch.setattr("fotoobo.helpers.vault.Client.load_token", MagicMock(result=True))
client = Client(
url="dummy_url",
ssl_verify=False,
namespace="dummy_namespace",
data_path="dummy_data_path",
role_id="dummy_role_id",
Expand Down Expand Up @@ -127,6 +129,34 @@ def test_validate_token(response: ResponseMock, valid: bool, monkeypatch: Monkey
assert client.validate_token() == valid
assert bool(client.token) == valid

@staticmethod
@pytest.mark.parametrize(
"mock",
(
pytest.param(
MagicMock(side_effect=requests.exceptions.SSLError("dummy")),
id="SSLError",
),
pytest.param(
MagicMock(side_effect=requests.exceptions.ConnectionError("dummy")),
id="ConnectionError",
),
),
)
def test_validate_token_exception(mock: ResponseMock, monkeypatch: MonkeyPatch) -> None:
"""Test the Client validate_token with exception"""
monkeypatch.setattr("fotoobo.helpers.vault.requests.get", mock)
client = Client(
url="https://dummy_url",
namespace="dummy_namespace",
data_path="dummy_data_path",
role_id="dummy_role_id",
secret_id="dummy_secret_id",
)
client.token = "dummy_token"
assert client.validate_token() is False
assert client.token == ""

@staticmethod
@pytest.mark.parametrize(
"response, token",
Expand Down Expand Up @@ -165,6 +195,34 @@ def test_get_token(
if token:
assert token_file.is_file()

@staticmethod
@pytest.mark.parametrize(
"mock",
(
pytest.param(
MagicMock(side_effect=requests.exceptions.SSLError("dummy")),
id="SSLError",
),
pytest.param(
MagicMock(side_effect=requests.exceptions.ConnectionError("dummy")),
id="ConnectionError",
),
),
)
def test_get_token_exception(mock: ResponseMock, monkeypatch: MonkeyPatch) -> None:
"""Test the Client get_token with exception"""
monkeypatch.setattr("fotoobo.helpers.vault.requests.post", mock)
client = Client(
url="https://dummy_url",
namespace="dummy_namespace",
data_path="dummy_data_path",
role_id="dummy_role_id",
secret_id="dummy_secret_id",
)
client.token = "dummy_token"
assert client.get_token() is False
assert client.token == ""

@staticmethod
@pytest.mark.parametrize(
"response, data",
Expand Down

0 comments on commit a9ce959

Please sign in to comment.