Skip to content

Commit

Permalink
fix: disable ssl warnings and enable verify by default
Browse files Browse the repository at this point in the history
  • Loading branch information
MatteoVoges committed Oct 5, 2023
1 parent 39b80cf commit c590206
Showing 1 changed file with 73 additions and 56 deletions.
129 changes: 73 additions & 56 deletions kapitan/refs/vault_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os

import hvac
from requests.exceptions import SSLError

from kapitan.errors import KapitanError

Expand All @@ -26,8 +27,9 @@ class VaultClient(hvac.Client):

def __init__(self, vault_parameters):
self.vault_parameters = vault_parameters
client_parameters = get_env(vault_parameters)
super().__init__(**client_parameters)

self.client_parameters = self.get_env()
super().__init__(**self.client_parameters)

self.authenticate()

Expand Down Expand Up @@ -64,7 +66,11 @@ def read_token_from_file(self, token_file):

def authenticate(self):
# different login method based on authentication type
auth_type = self.vault_parameters["auth"]
auth_type = self.vault_parameters.get("auth")
if not auth_type:
raise VaultError(
"No authentication method defined. Please specify it in 'parameters.kapitan.secrets.vaultkv.auth'."
)

token = self.get_auth_token()
username = os.getenv("VAULT_USERNAME")
Expand Down Expand Up @@ -109,62 +115,73 @@ def authenticate(self):
else:
raise VaultError(f"Authentication type '{auth_type}' not supported")

if not self.is_authenticated():
try:
authenticated = self.is_authenticated()
except SSLError as e:
ca_path = os.path.abspath(self.client_parameters["verify"])
raise VaultError(f"Error while verifying ca-bundle at {ca_path}: {e.__context__.__context__}")

if not authenticated:
self.adapter.close()
raise VaultError("Vault Authentication Error, Environment Variables defined?")


def get_env(vault_parameters):
"""
The following variables need to be exported to the environment or defined in inventory.
* VAULT_ADDR: url for vault
* VAULT_SKIP_VERIFY: if set, do not verify presented TLS certificate before communicating with Vault server.
* VAULT_CLIENT_KEY: path to an unencrypted PEM-encoded private key matching the client certificate
* VAULT_CLIENT_CERT: path to a PEM-encoded client certificate for TLS authentication to the Vault server
* VAULT_CACERT: path to a PEM-encoded CA cert file to use to verify the Vault server TLS certificate
* VAULT_CAPATH: path to a directory of PEM-encoded CA cert files to verify the Vault server TLS certificate
* VAULT_NAMESPACE: specify the Vault Namespace, if you have one
"""
client_parameters = {}

# fetch missing values from env
variables = ["ADDR", "NAMESPACE", "SKIP_VERIFY", "CACERT", "CAPATH", "CLIENT_KEY", "CLIENT_CERT"]
for var in variables:
var = "VAULT_" + var
if vault_parameters.get(var) is None and os.getenv(var) is not None:
vault_parameters[var] = os.getenv(var)

# set vault adress
vault_address = vault_parameters.get("VAULT_ADDR")
if not vault_address:
raise VaultError("VAULT_ADDR has to be specified in inventory or env")
else:
client_parameters["url"] = vault_address

# set vault namespace
vault_namespace = vault_parameters.get("VAULT_NAMESPACE")
if vault_namespace:
client_parameters["namespace"] = vault_namespace

# set ca cert/path or skip verification
skip_verify = not (str(vault_parameters.get("VAULT_SKIP_VERIFY", False)).lower() == "false")
if skip_verify:
# TODO: surpress ssl warning
client_parameters["verify"] = False
else:
cert = vault_parameters.get("VAULT_CACERT")
cert_path = vault_parameters.get("VAULT_CAPATH")
if cert and os.path.isfile(cert):
client_parameters["verify"] = cert
elif cert_path and os.path.isdir(cert_path):
client_parameters["verify"] = cert_path
def get_env(self):
"""
The following variables need to be exported to the environment or defined in inventory.
* VAULT_ADDR: url for vault
* VAULT_SKIP_VERIFY: if set, do not verify presented TLS certificate before communicating with Vault server.
* VAULT_CLIENT_KEY: path to an unencrypted PEM-encoded private key matching the client certificate
* VAULT_CLIENT_CERT: path to a PEM-encoded client certificate for TLS authentication to the Vault server
* VAULT_CACERT: path to a PEM-encoded CA cert file to use to verify the Vault server TLS certificate
* VAULT_CAPATH: path to a directory of PEM-encoded CA cert files to verify the Vault server TLS certificate
* VAULT_NAMESPACE: specify the Vault Namespace, if you have one
"""
client_parameters = {}

# fetch missing values from env
variables = ["ADDR", "NAMESPACE", "SKIP_VERIFY", "CACERT", "CAPATH", "CLIENT_KEY", "CLIENT_CERT"]
for var in variables:
var = "VAULT_" + var
if self.vault_parameters.get(var) is None and os.getenv(var) is not None:
self.vault_parameters[var] = os.getenv(var)

# set vault adress
vault_address = self.vault_parameters.get("VAULT_ADDR")
if not vault_address:
raise VaultError("VAULT_ADDR has to be specified in inventory or env")
else:
client_parameters["url"] = vault_address

# set vault namespace
vault_namespace = self.vault_parameters.get("VAULT_NAMESPACE")
if vault_namespace:
client_parameters["namespace"] = vault_namespace

# set ca cert/path or skip verification
skip_verify_value = self.vault_parameters.get("VAULT_SKIP_VERIFY", False)
skip_verify_bool = not (str(skip_verify_value).lower() == "false")
if skip_verify_bool:
# disable ssl warnings
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
client_parameters["verify"] = False
else:
raise VaultError("Neither VAULT_CACERT nor VAULT_CAPATH specified or exist")
cert = self.vault_parameters.get("VAULT_CACERT")
cert_path = self.vault_parameters.get("VAULT_CAPATH")
if cert and os.path.isfile(cert):
client_parameters["verify"] = cert
elif cert_path and os.path.isdir(cert_path):
client_parameters["verify"] = cert_path
else:
raise VaultError(
"Neither VAULT_CACERT nor VAULT_CAPATH specified or found. If you don't want to verify the requests, set VAULT_SKIP_VERIFY to 'true'."
)

# set client cert for tls authentication
client_key = vault_parameters.get("VAULT_CLIENT_KEY")
client_cert = vault_parameters.get("VAULT_CLIENT_CERT")
if client_key and client_cert:
client_parameters["cert"] = (client_cert, client_key)
# set client cert for tls authentication
client_key = self.vault_parameters.get("VAULT_CLIENT_KEY")
client_cert = self.vault_parameters.get("VAULT_CLIENT_CERT")
if client_key and client_cert:
client_parameters["cert"] = (client_cert, client_key)

return client_parameters
return client_parameters

0 comments on commit c590206

Please sign in to comment.