Skip to content

Commit

Permalink
Merge pull request #4878 from wazuh/enhacement/4591-vd-vuln-cases
Browse files Browse the repository at this point in the history
Vulnerability Detector E2E Tests
  • Loading branch information
davidjiglesias authored Feb 2, 2024
2 parents 07c1a0e + a0e7df6 commit d8b9943
Show file tree
Hide file tree
Showing 36 changed files with 4,361 additions and 46 deletions.
1 change: 1 addition & 0 deletions deps/wazuh_testing/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
'data/all_disabled_ossec.conf',
'data/syscollector_parsed_packages.json',
'tools/migration_tool/delta_schema.json',
'end_to_end/vulnerability_detector_packages/vuln_packages.json',
'tools/migration_tool/CVE_JSON_5.0_bundled.json'
]

Expand Down
16 changes: 16 additions & 0 deletions deps/wazuh_testing/wazuh_testing/end_to_end/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@

fetched_alerts_json_path = os.path.join(gettempdir(), 'alerts.json')

base_path = {
'linux': '/var/ossec',
'windows': r'C:\Program Files (x86)\ossec-agent',
'macos': '/Library/Ossec'
}
configuration_filepath_os = {
'linux': os.path.join(base_path['linux'], 'etc', 'ossec.conf'),
'windows': os.path.join(base_path['windows'], 'ossec.conf'),
'macos': os.path.join(base_path['macos'], 'etc', 'ossec.conf')
}
logs_filepath_os = {
'linux': os.path.join(base_path['linux'], 'logs', 'ossec.log'),
'windows': os.path.join(base_path['windows'], 'ossec.log'),
'macos': os.path.join(base_path['macos'], 'logs', 'ossec.log')
}


@retry(Exception, attempts=3, delay=5)
def get_alert_indexer_api(query, credentials, ip_address, index='wazuh-alerts-4.x-*'):
Expand Down
204 changes: 204 additions & 0 deletions deps/wazuh_testing/wazuh_testing/end_to_end/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
Module for change configurations of remote hosts.
----------------------------------------
This module provides functions for configuring and managing remote host
configurations using the HostManager class and related tools.
Functions:
- backup_configurations: Backup configurations for all hosts in the specified host manager.
- restore_configuration: Restore configurations for all hosts in the specified host manager.
- configure_host: Configure a specific host.
- configure_environment: Configure the environment for all hosts in the specified host manager.
Copyright (C) 2015, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2
"""
import xml.dom.minidom
import logging

from multiprocessing.pool import ThreadPool
from typing import Dict, List

from wazuh_testing.end_to_end import configuration_filepath_os
from wazuh_testing.tools.configuration import set_section_wazuh_conf
from wazuh_testing.tools.system import HostManager


def backup_configurations(host_manager: HostManager) -> Dict[str, str]:
"""
Backup configurations for all hosts in the specified host manager.
Args:
host_manager: An instance of the HostManager class containing information about hosts.
Returns:
dict: A dictionary mapping host names to their configurations.
Example of returned dictionary:
{
'manager': '<ossec_config>...</ossec_config>',
'agent1': ...
}
"""
logging.info("Backing up configurations")
backup_configurations = {}
for host in host_manager.get_group_hosts('all'):
host_os_name = host_manager.get_host_variables(host)['os_name']
configuration_filepath = configuration_filepath_os[host_os_name]

backup_configurations[host] = host_manager.get_file_content(str(host),
configuration_filepath)
logging.info("Configurations backed up")
return backup_configurations


def restore_configuration(host_manager: HostManager, configuration: Dict[str, List]) -> None:
"""
Restore configurations for all hosts in the specified host manager.
Args:
host_manager: An instance of the HostManager class containing information about hosts.
configuration: A dictionary mapping host names to their configurations.
Example of configuration dictionary:
{
'manager': '<ossec_config>...</ossec_config>',
'agent1': ...
}
"""
logging.info("Restoring configurations")
for host in host_manager.get_group_hosts('all'):
host_os_name = host_manager.get_host_variables(host)['os_name']
configuration_filepath = configuration_filepath_os[host_os_name]

host_manager.modify_file_content(host, configuration_filepath, configuration[host])
logging.info("Configurations restored")


def configure_host(host: str, host_configuration: Dict[str, Dict], host_manager: HostManager) -> None:
"""
Configure a specific host.
Args:
host: The name of the host to be configured.
host_configuration: Role of the configured host for the host. Check below for example.
host_manager: An instance of the HostManager class containing information about hosts.
Note: The host_configuration dictionary must contain a list of sections and elements to be configured. The sections
not included in the dictionary will not be modified maintaining the current configuration.
Example of host_configuration dictionary:
{
"manager1":[
{
"sections":[
{
"section":"vulnerability-detection",
"elements":[
{
"enabled":{
"value":"yes"
}
},
{
"index-status":{
"value":"yes"
}
},
{
"feed-update-interval":{
"value":"2h"
}
}
]
},
],
"metadata":{}
}
],
}
"""
logging.info(f"Configuring host {host}")

host_os = host_manager.get_host_variables(host)['os_name']
config_file_path = configuration_filepath_os[host_os]

host_config = host_configuration.get(host)

if not host_config:
raise TypeError(f"Host {host} configuration does not include a valid role (manager or agent):"
f"{host_configuration}")

current_config = host_manager.get_file_content(str(host), config_file_path)

# Extract the sections from the first element of host_config

sections = host_config[0].get('sections')

# Combine the current hos configuration and the desired configuration
new_config_unformatted = set_section_wazuh_conf(sections, current_config.split("\n"))

# Format new configuration
new_config_formatted_xml = xml.dom.minidom.parseString(''.join(new_config_unformatted))

# Get rid of the first no expected XML version line
new_config_formatted_xml = new_config_formatted_xml.toprettyxml().split("\n")[1:]

final_configuration = "\n".join(new_config_formatted_xml)

host_manager.modify_file_content(str(host), config_file_path, final_configuration)

logging.info(f"Host {host} configured")


def configure_environment(host_manager: HostManager, configurations: Dict[str, List]) -> None:
"""
Configure the environment for all hosts in the specified host manager.
Args:
host_manager: An instance of the HostManager class containing information about hosts.
configurations: A dictionary mapping host roles to their configuration details.
Example of host_configurations dictionary:
{
"manager1":[
{
"sections":[
{
"section":"vulnerability-detection",
"elements":[
{
"enabled":{
"value":"yes"
}
},
{
"index-status":{
"value":"yes"
}
},
{
"feed-update-interval":{
"value":"2h"
}
}
]
},
],
"metadata":{}
}
],
}
"""
logging.info("Configuring environment")
configure_environment_parallel_map = [(host, configurations) for host in host_manager.get_group_hosts('all')]

with ThreadPool() as pool:
pool.starmap(configure_host,
[(host, config, host_manager) for host, config in configure_environment_parallel_map])

logging.info("Environment configured")
83 changes: 83 additions & 0 deletions deps/wazuh_testing/wazuh_testing/end_to_end/indexer_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Wazuh API Indexer Module.
-----------------------------------
This module provides functions to interact with the Wazuh Indexer API.
Functions:
- get_indexer_values: Retrieves values from the Indexer API.
Copyright (C) 2015, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2
"""
import requests
import logging
from typing import Dict

from wazuh_testing.tools.system import HostManager


STATE_INDEX_NAME = 'wazuh-vulnerabilities-states'


def get_indexer_values(host_manager: HostManager, credentials: dict = {'user': 'admin', 'password': 'changeme'},
index: str = 'wazuh-alerts*', greater_than_timestamp=None) -> Dict:
"""
Get values from the Wazuh Indexer API.
Args:
host_manager: An instance of the HostManager class containing information about hosts.
credentials (Optional): A dictionary containing the Indexer credentials. Defaults to
{'user': 'admin', 'password': 'changeme'}.
index (Optional): The Indexer index name. Defaults to 'wazuh-alerts*'.
greater_than_timestamp (Optional): The timestamp to filter the results. Defaults to None.
Returns:
Dict: A dictionary containing the values retrieved from the Indexer API.
"""
logging.info(f"Getting values from the Indexer API for index {index}")

url = f"https://{host_manager.get_master_ip()}:9200/{index}/_search"
headers = {
'Content-Type': 'application/json',
}

data = {
"query": {
"match_all": {}
}
}

if greater_than_timestamp:
query = {
"bool": {
"must": [
{"match_all": {}},
{"range": {"@timestamp": {"gte": f"{greater_than_timestamp}"}}}
]
}
}

sort = [
{
"@timestamp": {
"order": "desc"
}
}
]

data['query'] = query
data['sort'] = sort

param = {
'pretty': 'true',
'size': 10000,
}

response = requests.get(url=url, params=param, verify=False,
auth=requests.auth.HTTPBasicAuth(credentials['user'], credentials['password']),
headers=headers,
json=data)

return response.json()
61 changes: 61 additions & 0 deletions deps/wazuh_testing/wazuh_testing/end_to_end/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Logs management module for remote hosts.
---------------------------------------
Description:
This module provides functions for truncating logs and alerts for Wazuh agents and managers.
Functions:
- truncate_remote_host_group_files: Truncate the specified files in all the host of a group
- get_hosts_logs: Get the logs from the specified host group
Copyright (C) 2015, Wazuh Inc.
Created by Wazuh, Inc. <[email protected]>.
This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2
"""
from typing import Dict

from wazuh_testing import ALERTS_JSON_PATH
from wazuh_testing.end_to_end import logs_filepath_os
from wazuh_testing.tools.system import HostManager


def truncate_remote_host_group_files(host_manager: HostManager, host_group: str,
file_to_truncate: str = 'logs') -> None:
"""
Truncate log or alert files on remote hosts in a specified host group.
Parameters:
- host_manager (HostManager): An instance of the HostManager class for managing remote hosts.
- host_group (str): The name of the host group where the files will be truncated.
- file_to_truncate (str, optional): The type of file to truncate. Default is 'logs'.
Possible values are 'logs' for log files or 'alerts' for alert files.
"""
for host in host_manager.get_group_hosts(host_group):
if file_to_truncate == 'logs':
host_os_name = host_manager.get_host_variables(host)['os_name']
log_file_path = logs_filepath_os[host_os_name]
elif file_to_truncate == 'alerts':
log_file_path = ALERTS_JSON_PATH
else:
log_file_path = file_to_truncate

host_manager.truncate_file(host, log_file_path)


def get_hosts_logs(host_manager: HostManager, host_group: str = 'all') -> Dict[str, str]:
"""
Get the logs from the specified host group.
Parameters:
- host_manager (HostManager): An instance of the HostManager class for managing remote hosts.
- host_group (str, optional): The name of the host group where the files will be truncated.
Default is 'all'.
"""
host_logs = {}
for host in host_manager.get_group_hosts(host_group):
host_os_name = host_manager.get_host_variables(host)['os_name']
host_logs[host] = host_manager.get_file_content(host, logs_filepath_os[host_os_name])

return host_logs
Loading

0 comments on commit d8b9943

Please sign in to comment.