-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add fetchLocalCommands helper method. Adds a helper method to the ComplianceFetcher class. It can be used to execute commands locally and retrieve output. The output is formatted to look like it was copied from a terminal so that it can be stored as evidence. * Fix locker branch implementation. This change allows the branch to be specified via `locker.branch` in the main configuration file. A new branch is created if it doesn't exist on the remote. * Make local evidence repository path configurable. This change allows the local evidence repository path to be specified via `locker.local_path` in the main configuration file. * Implement agent mode. This change implements agent mode. Any evidence created in agent mode will be stored under the corresponding agent directory. Agents will cryptographically sign any evidence they fetch. Signed evidence can be used in checks and is automatically verified when loaded from the locker. * Add documentation explaining how to verify encrypted evidence. * Add file 'compliance/utils/fetch_local_commands' to manifest. * Attempt to import any missing fetchers from include JSON file. * Correctly set agent when loading evidence from the cache. * Add 'locker.ignore_signatures' configuration. Set 'locker.ignore_signatures' to 'true' to skip signature verification. * Use correct evidence metadata when fetching previous revisions. Ensure the 'evidence_dt' is specified when fetching evidence metadata. * Fix agent class property return types. Change following properties to correctly return a boolean type: - compliance.agent.ComplianceAgent.signable - compliance.agent.ComplianceAgent.verifiable * Add 'locker.default_branch' configuration. Defaults to 'master'. * Add 'locker.force_push' configuration. Defaults to False. * Add 'locker.agent_public_key' configuration. Allow an Agent public key to be specified in the configuration. * Update CHANGES.md for 1.22.0 release.
- Loading branch information
Showing
18 changed files
with
1,175 additions
and
68 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
graft compliance/templates | ||
include compliance/utils/fetch_local_commands |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
# -*- mode:python; coding:utf-8 -*- | ||
# Copyright (c) 2022 IBM Corp. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Compliance check automation module.""" | ||
|
||
import base64 | ||
import hashlib | ||
from pathlib import PurePath | ||
|
||
from compliance.config import get_config | ||
|
||
from cryptography.exceptions import InvalidSignature | ||
from cryptography.hazmat.backends import default_backend | ||
from cryptography.hazmat.primitives import hashes, serialization | ||
from cryptography.hazmat.primitives.asymmetric import padding | ||
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed | ||
|
||
|
||
class ComplianceAgent: | ||
"""Compliance agent class.""" | ||
|
||
AGENTS_DIR = 'agents' | ||
PUBLIC_KEYS_EVIDENCE_PATH = 'raw/auditree/agent_public_keys.json' | ||
|
||
def __init__(self, name=None, use_agent_dir=True): | ||
"""Construct and initialize the agent object.""" | ||
self._name = name | ||
self._private_key = self._public_key = None | ||
self._use_agent_dir = use_agent_dir | ||
|
||
@property | ||
def name(self): | ||
"""Get agent name.""" | ||
return self._name | ||
|
||
@property | ||
def private_key(self): | ||
"""Get agent private key.""" | ||
return self._private_key | ||
|
||
@private_key.setter | ||
def private_key(self, data_bytes): | ||
""" | ||
Set agent private key. | ||
:param data_bytes: The PEM encoded key data as `bytes`. | ||
""" | ||
self._private_key = serialization.load_pem_private_key( | ||
data_bytes, None, default_backend() | ||
) | ||
|
||
@property | ||
def public_key(self): | ||
"""Get agent public key.""" | ||
return self._public_key | ||
|
||
@public_key.setter | ||
def public_key(self, data_bytes): | ||
""" | ||
Set agent public key. | ||
:param data_bytes: The PEM encoded key data as `bytes`. | ||
""" | ||
if self.name: | ||
self._public_key = serialization.load_pem_public_key(data_bytes) | ||
|
||
def get_path(self, path): | ||
""" | ||
Get the full evidence path. | ||
:param path: The relative evidence path as a string. | ||
:returns: The full evidence path as a string. | ||
""" | ||
if self.name and self._use_agent_dir: | ||
if PurePath(path).parts[0] != self.AGENTS_DIR: | ||
return str(PurePath(self.AGENTS_DIR, self.name, path)) | ||
return path | ||
|
||
def signable(self): | ||
"""Determine if the agent can sign evidence.""" | ||
return all([self.name, self.private_key]) | ||
|
||
def verifiable(self): | ||
"""Determine if the agent can verify evidence.""" | ||
return all([self.name, self.public_key]) | ||
|
||
def load_public_key_from_locker(self, locker): | ||
""" | ||
Load agent public key from locker. | ||
:param locker: A locker of type :class:`compliance.locker.Locker`. | ||
""" | ||
if not self.name: | ||
return | ||
try: | ||
public_keys = locker.get_evidence(self.PUBLIC_KEYS_EVIDENCE_PATH) | ||
public_key_str = public_keys.content_as_json[self.name] | ||
self.public_key = public_key_str.encode() | ||
except Exception: | ||
self._public_key = None # Missing public key evidence. | ||
|
||
def hash_and_sign(self, data_bytes): | ||
""" | ||
Hash and sign evidence using the agent private key. | ||
:param data_bytes: The data to sign as `bytes`. | ||
:returns: A `tuple` containing the hexadecimal digest string and the | ||
base64 encoded signature string. Returns tuple `(None, None)` if the | ||
agent is not configured to sign evidence. | ||
""" | ||
if not self.signable(): | ||
return None, None | ||
hashed = hashlib.sha256(data_bytes) | ||
signature = self.private_key.sign( | ||
hashed.digest(), | ||
padding.PSS( | ||
mgf=padding.MGF1(hashes.SHA256()), | ||
salt_length=padding.PSS.MAX_LENGTH | ||
), | ||
Prehashed(hashes.SHA256()) | ||
) | ||
return hashed.hexdigest(), base64.b64encode(signature).decode() | ||
|
||
def verify(self, data_bytes, signature_b64): | ||
""" | ||
Verify evidence using the agent public key. | ||
:param data_bytes: The data to verify as `bytes`. | ||
:param signature_b64: The base64 encoded signature string. | ||
:returns: `True` if data can be verified, else `False`. | ||
""" | ||
if not self.verifiable(): | ||
return False | ||
try: | ||
self.public_key.verify( | ||
base64.b64decode(signature_b64), | ||
data_bytes, | ||
padding.PSS( | ||
mgf=padding.MGF1(hashes.SHA256()), | ||
salt_length=padding.PSS.MAX_LENGTH | ||
), | ||
hashes.SHA256() | ||
) | ||
return True | ||
except InvalidSignature: | ||
return False | ||
|
||
@classmethod | ||
def from_config(cls): | ||
"""Load agent from configuration.""" | ||
config = get_config() | ||
agent = cls( | ||
name=config.get('agent_name'), | ||
use_agent_dir=config.get('use_agent_dir', True) | ||
) | ||
private_key_path = config.get('agent_private_key') | ||
public_key_path = config.get('agent_public_key') | ||
if private_key_path: | ||
with open(private_key_path, 'rb') as key_file: | ||
agent.private_key = key_file.read() | ||
agent.public_key = agent.private_key.public_key().public_bytes( | ||
encoding=serialization.Encoding.PEM, | ||
format=serialization.PublicFormat.SubjectPublicKeyInfo | ||
) | ||
elif public_key_path: | ||
with open(public_key_path, 'rb') as key_file: | ||
agent.public_key = key_file.read() | ||
return agent |
Oops, something went wrong.