Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated recovery.py script in wazuh-central-components.rst #7822

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,58 @@ Perform the following actions on both master and worker nodes of your Wazuh serv

Restoring old logs will have a creation date of the day when the restoration is performed.

#. Create a Python script called ``recovery.py`` on your Wazuh server. This script decompresses all the old logs and stores them in the ``recovery.json`` file in ``/tmp`` directory.
#. Add the ``/tmp/wazuh-recovery/alerts.json`` path to the Wazuh Filebeat module ``/usr/share/filebeat/module/wazuh/alerts/manifest.yml`` so that Filebeat sends the old alerts to the Wazuh indexer for indexing:

.. code-block:: yaml
:emphasize-lines: 7

module_version: 0.1

var:
- name: paths
default:
- /var/ossec/logs/alerts/alerts.json
- /tmp/wazuh-recovery/alerts.json
- name: index_prefix
default: wazuh-alerts-4.x-

input: config/alerts.yml

ingest_pipeline: ingest/pipeline.json


If also the archives are to be recovered, add the ``/tmp/wazuh-recovery/archives.json`` path to the Wazuh Filebeat module ``/usr/share/filebeat/module/wazuh/archives/manifest.yml``:

.. code-block:: yaml
:emphasize-lines: 7

module_version: 0.1

var:
- name: paths
default:
- /var/ossec/logs/archives/archives.json
- /tmp/wazuh-recovery/archives.json
- name: index_prefix
default: wazuh-archives-4.x-

input: config/archives.yml

ingest_pipeline: ingest/pipeline.json

#. Restart Filebeat for the changes to take effect.

.. code-block:: console

# systemctl restart filebeat

#. Install PyYaml, if not already present.

.. code-block:: console

# pip install PyYAML

#. Create a Python script called ``recovery.py`` on your Wazuh server. This script decompresses all the old logs and stores them in the ``recovery.json`` file in ``/tmp/wazuh-recovery`` directory.

.. code-block:: console

Expand All @@ -624,69 +675,79 @@ Perform the following actions on both master and worker nodes of your Wazuh serv

.. code-block:: python

#!/usr/bin/env python

#!/bin/python3
import gzip
import time
import yaml
import json
import argparse
import re
import os
from datetime import datetime
from datetime import timedelta



def log(msg):
now_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
final_msg = "{0} wazuh-reinjection: {1}".format(now_date, msg)
final_msg = f"{now_date} wazuh-reinjection: {msg}"
print(final_msg)
if log_file:
f_log.write(final_msg + "\n")

EPS_MAX = 400
wazuh_path = '/var/ossec/'
max_size=1


log_file = None

logs_dir = 'alerts'
logs_name = 'alerts'

parser = argparse.ArgumentParser(description='Reinjection script')
parser.add_argument('-eps','--eps', metavar='eps', type=int, required = False, help='Events per second.')
parser.add_argument('-min', '--min_timestamp', metavar='min_timestamp', type=str, required = True, help='Min timestamp. Example: 2017-12-13T23:59:06')
parser.add_argument('-max', '--max_timestamp', metavar='max_timestamp', type=str, required = True, help='Max timestamp. Example: 2017-12-13T23:59:06')
parser.add_argument('-o', '--output_file', metavar='output_file', type=str, required = True, help='Output filename.')
parser.add_argument('-log', '--log_file', metavar='log_file', type=str, required = False, help='Logs output')
parser.add_argument('-w', '--wazuh_path', metavar='wazuh_path', type=str, required = False, help='Path to Wazuh. By default:/var/ossec/')
parser.add_argument('-sz', '--max_size', metavar='max_size', type=float, required = False, help='Max output file size in Gb. Default: 1Gb. Example: 2.5')

parser.add_argument('-min', '--min_timestamp', metavar='min_timestamp', type=str, required=True,
help='Min timestamp. Example: 2017-12-13T23:59:06')
parser.add_argument('-max', '--max_timestamp', metavar='max_timestamp', type=str, required=True,
help='Max timestamp. Example: 2017-12-13T23:59:06')
parser.add_argument('-o', '--output_file', metavar='output_file', type=str, required=False,
help='Output filename. By default, reads it from filebeat manifest.yml')
parser.add_argument('-log', '--log_file', metavar='log_file', type=str, required=False, help='Logs output')
parser.add_argument('-arc', '--archives', action='store_true', help='Recover archives instead of alerts')

args = parser.parse_args()

if args.log_file:
log_file = args.log_file
f_log = open(log_file, 'a+')


if args.max_size:
max_size = args.max_size

if args.wazuh_path:
wazuh_path = args.wazuh_path

output_file = args.output_file

#Gb to bytes
max_bytes = int(max_size * 1024 * 1024 * 1024)

if (max_bytes <= 0):
log("Error: Incorrect max_size")
exit(1)

month_dict = ['Null','Jan','Feb','Mar','Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov','Dec']

if args.eps:
EPS_MAX = args.eps

if EPS_MAX < 0:
log("Error: incorrect EPS")

if args.archives:
logs_dir = 'archives'
logs_name = 'archive'

if args.output_file:
output_file = args.output_file
else:
filebeat_manifest = f"/usr/share/filebeat/module/wazuh/{logs_dir}/manifest.yml"
try:
with open(filebeat_manifest, 'r') as file:
yaml = yaml.safe_load(file)
output_file = ''
for el in yaml['var']:
if el['name'] == 'paths':
if len(output_file) > 0:
raise Exception(f'Multiple section with "name: paths" in {filebeat_manifest}.')
paths = el['default']
if len(paths) < 2:
raise Exception(
f'There must be at least two paths in the section "default" in {filebeat_manifest}.')
output_file = paths[1]
if len(output_file) == 0:
raise Exception(f'No section with "name: paths" in {filebeat_manifest}.')
except Exception as e:
log(f"Filebeat manifest.yml error: {e}")
exit(1)

if os.path.exists(output_file):
log("Error: Output file {output_file} already exists.")
exit(1)


month_dict = ['Null', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

min_date = re.search('(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)T\\d\\d:\\d\\d:\\d\\d', args.min_timestamp)
if min_date:
min_year = int(min_date.group(1))
Expand All @@ -695,7 +756,7 @@ Perform the following actions on both master and worker nodes of your Wazuh serv
else:
log("Error: Incorrect min timestamp")
exit(1)

max_date = re.search('(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)T\\d\\d:\\d\\d:\\d\\d', args.max_timestamp)
if max_date:
max_year = int(max_date.group(1))
Expand All @@ -704,91 +765,72 @@ Perform the following actions on both master and worker nodes of your Wazuh serv
else:
log("Error: Incorrect max timestamp")
exit(1)

# Converting timestamp args to datetime
min_timestamp = datetime.strptime(args.min_timestamp, '%Y-%m-%dT%H:%M:%S')
max_timestamp = datetime.strptime(args.max_timestamp, '%Y-%m-%dT%H:%M:%S')
max_time = datetime(max_year, max_month, max_day)
ct = datetime(min_year, min_month, min_day)

os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w') as trimmed_alerts:
while ct <= max_time:
alert_file = f"/var/ossec/logs/{logs_dir}/{ct.year}/{month_dict[ct.month]}/ossec-{logs_name}-{ct.day:02}.json.gz"

if os.path.exists(alert_file):
daily_alerts = 0
with gzip.open(alert_file, 'r') as compressed_alerts:
log("Reading file: " + alert_file)
for line in compressed_alerts:
# Transform line to json object
try:
line_json = json.loads(line.decode("utf-8", "replace"))

# Remove unnecessary part of the timestamp
string_timestamp = line_json['timestamp'][:19]

# Ensure timestamp integrity
while len(line_json['timestamp'].split("+")[0]) < 23:
line_json['timestamp'] = line_json['timestamp'][:20] + "0" + line_json['timestamp'][20:]

# Get the timestamp readable
event_date = datetime.strptime(string_timestamp, '%Y-%m-%dT%H:%M:%S')

# Check the timestamp belongs to the selected range
if max_timestamp >= event_date >= min_timestamp:
trimmed_alerts.write(json.dumps(line_json))
trimmed_alerts.write("\n")
trimmed_alerts.flush()
daily_alerts += 1

except Exception as e:
log(f"Oops! Something went wrong reading line {line}. Error: {e}")
log(f"Extracted {daily_alerts} alerts from day {ct.year}-{ct.month}-{ct.day}")
else:
log(f"Couldn't find file {alert_file}")
ct += timedelta(days=1)

chunk = 0
written_alerts = 0
trimmed_alerts = open(output_file, 'w')

max_time=datetime(max_year, max_month, max_day)
current_time=datetime(min_year, min_month, min_day)

while current_time <= max_time:
alert_file = "{0}logs/alerts/{1}/{2}/ossec-alerts-{3:02}.json.gz".format(wazuh_path,current_time.year,month_dict[current_time.month],current_time.day)

if os.path.exists(alert_file):
daily_alerts = 0
compressed_alerts = gzip.open(alert_file, 'r')
log("Reading file: "+ alert_file)
for line in compressed_alerts:
# Transform line to json object
try:
line_json = json.loads(line.decode("utf-8", "replace"))

# Remove unnecessary part of the timestamp
string_timestamp = line_json['timestamp'][:19]

# Ensure timestamp integrity
while len(line_json['timestamp'].split("+")[0]) < 23:
line_json['timestamp'] = line_json['timestamp'][:20] + "0" + line_json['timestamp'][20:]

# Get the timestamp readable
event_date = datetime.strptime(string_timestamp, '%Y-%m-%dT%H:%M:%S')

# Check the timestamp belongs to the selected range
if (event_date <= max_timestamp and event_date >= min_timestamp):
chunk+=1
trimmed_alerts.write(json.dumps(line_json))
trimmed_alerts.write("\n")
trimmed_alerts.flush()
daily_alerts += 1
if chunk >= EPS_MAX:
chunk = 0
time.sleep(2)
if os.path.getsize(output_file) >= max_bytes:
trimmed_alerts.close()
log("Output file reached max size, setting it to zero and restarting")
time.sleep(EPS_MAX/100)
trimmed_alerts = open(output_file, 'w')

except ValueError as e:
print("Oops! Something went wrong reading: {}".format(line))
print("This is the error: {}".format(str(e)))

compressed_alerts.close()
log("Extracted {0} alerts from day {1}-{2}-{3}".format(daily_alerts,current_time.day,month_dict[current_time.month],current_time.year))
else:
log("Couldn't find file {}".format(alert_file))

#Move to next file
current_time += timedelta(days=1)

trimmed_alerts.close()

While you run the ``recovery.py`` script, you need to consider the following parameters:

.. code-block:: none

usage: recovery.py [-h] [-eps eps] -min min_timestamp -max max_timestamp -o
output_file [-log log_file] [-w wazuh_path]
[-sz max_size]

-eps eps, --eps eps Events per second. Default: 400
usage: recovery.py [-h] -min min_timestamp -max max_timestamp [-o output_file] [-log log_file] [-arc]

Reinjection script

options:
-h, --help show this help message and exit
-min min_timestamp, --min_timestamp min_timestamp
Min timestamp. Example: 2019-11-13T08:42:17
Min timestamp. Example: 2017-12-13T23:59:06
-max max_timestamp, --max_timestamp max_timestamp
Max timestamp. Example: 2019-11-13T23:59:06
Max timestamp. Example: 2017-12-13T23:59:06
-o output_file, --output_file output_file
Alerts output file.
Output filename. By default, reads it from filebeat manifest.yml
-log log_file, --log_file log_file
Logs output.
-w wazuh_path, --wazuh_path wazuh_path
Path to Wazuh. By default:/var/ossec/
-sz max_size, --max_size max_size
Max output file size in Gb. Default: 1Gb. Example: 2.5
Logs output
-arc, --archives Recover archives instead of alerts


#. Run the command below to make the ``recovery.py`` script executable:

Expand All @@ -802,32 +844,7 @@ Perform the following actions on both master and worker nodes of your Wazuh serv

.. code-block:: console

# nohup ./recovery.py -eps 500 -min 2023-06-10T00:00:00 -max 2023-06-18T23:59:59 -o /tmp/recovery.json -log ./recovery.log -sz 2.5 &

#. Add the ``/tmp/recovery.json`` path to the Wazuh Filebeat module ``/usr/share/filebeat/module/wazuh/alerts/manifest.yml`` so that Filebeat sends the old alerts to the Wazuh indexer for indexing:

.. code-block:: yaml
:emphasize-lines: 7

module_version: 0.1

var:
- name: paths
default:
- /var/ossec/logs/alerts/alerts.json
- /tmp/recovery.json
- name: index_prefix
default: wazuh-alerts-4.x-

input: config/alerts.yml

ingest_pipeline: ingest/pipeline.json

#. Restart Filebeat for the changes to take effect.

.. code-block:: console

# systemctl restart filebeat
# nohup ./recovery.py -min 2024-06-10T00:00:00 -max 2024-06-18T23:59:59 -arc &

Verifying data restoration
^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down