Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script for generate FIM events #4972

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions deps/wazuh_testing/wazuh_testing/scripts/create_test_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import os
import sys
import signal
import argparse


if sys.platform == 'win32':
import win32api
import win32con
import pywintypes


if sys.platform == 'win32':
registry_parser = {
'HKEY_LOCAL_MACHINE': win32con.HKEY_LOCAL_MACHINE
}

registry_class_name = {
win32con.HKEY_LOCAL_MACHINE: 'HKEY_LOCAL_MACHINE'
}

registry_value_type = {
win32con.REG_SZ: 'REG_SZ'
}

REG_SZ = win32con.REG_SZ
KEY_WOW64_64KEY = win32con.KEY_WOW64_64KEY
KEY_ALL_ACCESS = win32con.KEY_ALL_ACCESS
RegOpenKeyEx = win32api.RegOpenKeyEx
KEY = "HKEY_LOCAL_MACHINE"

monitored_directory = os.path.join("C:", os.sep, "stress_test") if sys.platform == 'win32' else os.path.join("/" "stress_test")
testreg = os.path.join('SOFTWARE', 'testreg')


def signal_handler(sig, frame):
print("Signal received. Exiting...")
sys.exit(0)


def create_files(test_files):
for filename in test_files:
with open(os.path.join(monitored_directory, filename), 'w+') as f:
f.write('This is a test file')


def create_registry(key, subkey, arch):
"""Create a registry given the key and the subkey. The registry is opened if it already exists.

Args:
key (pyHKEY): the key of the registry (HKEY_* constants).
subkey (str): the subkey (name) of the registry.
arch (int): architecture of the registry (KEY_WOW64_32KEY or KEY_WOW64_64KEY).

Returns:
str: the key handle of the new/opened key.
"""

if sys.platform == 'win32':
try:
print("Creating registry key " + str(os.path.join(registry_class_name[key], subkey)))

key = win32api.RegCreateKeyEx(key, subkey, win32con.KEY_ALL_ACCESS | arch)

return key[0] # Ignore the flag that RegCreateKeyEx returns
except OSError as e:
print(f"Registry could not be created: {e}")
except pywintypes.error as e:
print(f"Registry could not be created: {e}")


def main(num_files):
if sys.platform == 'win32':
for n_registry in range(1, num_files+1):
h_key = create_registry(registry_parser[KEY], f'{testreg}{n_registry}', KEY_WOW64_64KEY)
else:
if not os.path.exists(monitored_directory):
os.makedirs(monitored_directory)

test_files = [f"Testing{i}.txt" for i in range(1, num_files+1)]
create_files(test_files)


if __name__ == "__main__":

signal.signal(signal.SIGINT, signal_handler)

parser = argparse.ArgumentParser(description='File manipulation script')
parser.add_argument('--num-files', type=int, default=5, help='Number of files to create')
args = parser.parse_args()

main(args.num_files)
173 changes: 173 additions & 0 deletions deps/wazuh_testing/wazuh_testing/scripts/generate_fim_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import os
import random
import string
import time
import argparse
import sys
import shutil
import signal
if sys.platform == 'win32':
import win32api
import win32con
import pywintypes

monitored_directory = os.path.join("C:", os.sep, "stress_test") if sys.platform == 'win32' else os.path.join("/" "stress_test")
if sys.platform == 'win32':
registry_parser = {
'HKEY_LOCAL_MACHINE': win32con.HKEY_LOCAL_MACHINE
}

registry_class_name = {
win32con.HKEY_LOCAL_MACHINE: 'HKEY_LOCAL_MACHINE'
}

registry_value_type = {
win32con.REG_SZ: 'REG_SZ'
}

REG_SZ = win32con.REG_SZ
KEY_WOW64_64KEY = win32con.KEY_WOW64_64KEY
KEY_ALL_ACCESS = win32con.KEY_ALL_ACCESS
RegOpenKeyEx = win32api.RegOpenKeyEx
KEY = "HKEY_LOCAL_MACHINE"

testreg = os.path.join('SOFTWARE', 'testreg')
reg_value = 'value_name'


def signal_handler(sig, frame):
print("Signal received. Exiting...")
sys.exit(0)


def create_registry(key, subkey, arch):
"""Create a registry given the key and the subkey. The registry is opened if it already exists.

Args:
key (pyHKEY): the key of the registry (HKEY_* constants).
subkey (str): the subkey (name) of the registry.
arch (int): architecture of the registry (KEY_WOW64_32KEY or KEY_WOW64_64KEY).

Returns:
str: the key handle of the new/opened key.
"""

if sys.platform == 'win32':
try:
print("Creating registry key " + str(os.path.join(registry_class_name[key], subkey)))

key = win32api.RegCreateKeyEx(key, subkey, win32con.KEY_ALL_ACCESS | arch)

return key[0] # Ignore the flag that RegCreateKeyEx returns
except OSError as e:
print(f"Registry could not be created: {e}")
except pywintypes.error as e:
print(f"Registry could not be created: {e}")


def delete_registry(key, subkey, arch):
"""Delete a registry key.

Args:
key (pyHKEY): the key of the registry (HKEY_* constants).
subkey (str): the subkey (name) of the registry.
arch (int): architecture of the registry (KEY_WOW64_32KEY or KEY_WOW64_64KEY).
"""
if sys.platform == 'win32':
print_arch = '[x64]' if arch == KEY_WOW64_64KEY else '[x32]'
print(f"Removing registry key {print_arch}{str(os.path.join(registry_class_name[key], subkey))}")

try:
key_h = win32api.RegOpenKeyEx(key, subkey, 0, win32con.KEY_ALL_ACCESS | arch)
win32api.RegDeleteTree(key_h, None)
win32api.RegDeleteKeyEx(key, subkey, samDesired=arch)
except OSError as e:
print(f"Couldn't remove registry key {str(os.path.join(registry_class_name[key], subkey))}: {e}")
except pywintypes.error as e:
print(f"Couldn't remove registry key {str(os.path.join(registry_class_name[key], subkey))}: {e}")


def modify_registry_value(key_h, value_name, type, value):
"""
Modify the content of a registry. If the value doesn't not exists, it will be created.

Args:
key_h (pyHKEY): the key handle of the registry.
value_name (str): the value to be set.
type (int): type of the value.
value (str): the content that will be written to the registry value.
"""
if sys.platform == 'win32':
try:
print(f"Modifying value '{value_name}' of type {registry_value_type[type]} and value '{value}'")
win32api.RegSetValueEx(key_h, value_name, 0, type, value)
except OSError as e:
print(f"Could not modify registry value content: {e}")
except pywintypes.error as e:
print(f"Could not modify registry value content: {e}")


def generate_events(test_files, file_size, eps):
generated_events = 0
n_events = int(eps/len(test_files))
remain_events = eps % len(test_files)
for _ in range(n_events):
if sys.platform == 'win32':
random_string = ''.join(random.choice(string.ascii_letters) for _ in range(10))
for n_registry in range(1, len(test_files)+1):
key_h = win32api.RegOpenKeyEx(registry_parser[KEY], f'{testreg}{n_registry}', 0, KEY_ALL_ACCESS | KEY_WOW64_64KEY)
modify_registry_value(key_h, reg_value, REG_SZ, random_string)
generated_events += 1
else:
random_string = ''.join(random.choice(string.ascii_letters) for _ in range(file_size))
print(random_string)
Rebits marked this conversation as resolved.
Show resolved Hide resolved
for filename in test_files:
with open(os.path.join(monitored_directory, filename), 'w+') as f:
f.write(random_string)
generated_events += 1

random_string = ''.join(random.choice(string.ascii_letters) for _ in range(file_size))
for filename in test_files[0:remain_events]:
with open(os.path.join(monitored_directory, filename), 'w+') as f:
f.write(random_string)
generated_events += 1

print(f'Generated {generated_events} events')
Rebits marked this conversation as resolved.
Show resolved Hide resolved


def main(num_files, duration, eps, file_size):
if not os.path.exists(monitored_directory):
os.makedirs(monitored_directory)

test_files = [f"Testing{i}.txt" for i in range(1, num_files+1)]

start_time = time.time()

print(f'Start time: {start_time}')

while (time.time() - start_time) < duration:
generate_events(test_files, file_size, eps)
time.sleep(1)

print(f'Duration: {time.time() - start_time}')

if sys.platform == 'win32':
for n_registry in range(1, num_files+1):
delete_registry(registry_parser[KEY], f'{testreg}{n_registry}', KEY_WOW64_64KEY)
else:
if os.path.exists(monitored_directory):
shutil.rmtree(monitored_directory)


if __name__ == "__main__":

signal.signal(signal.SIGINT, signal_handler)

parser = argparse.ArgumentParser(description='File manipulation script')
parser.add_argument('--num-files', type=int, default=5, help='Number of files to create')
parser.add_argument('--duration', type=int, default=10, help='Duration of script execution in seconds')
parser.add_argument('--eps', type=int, default=10, help='Number of events per second')
parser.add_argument('--file-size', type=int, default=1024, help='File size in Bytes')
args = parser.parse_args()

main(args.num_files, args.duration, args.eps, args.file_size)
Loading