diff --git a/mans_to_es/mans_to_es.py b/mans_to_es/mans_to_es.py index dbc22fe..829d12c 100644 --- a/mans_to_es/mans_to_es.py +++ b/mans_to_es/mans_to_es.py @@ -1,217 +1,149 @@ #!/usr/bin/env python3 - -import sys -import os +import os, sys +import argparse +import collections import json +import logging + import zipfile -import xmltodict -import collections +import shutil + import datetime +import ciso8601 + +import xmltodict + import pandas as pd -import argparse -import logging -from multiprocessing import cpu_count, Pool + +from multiprocessing import Pool, cpu_count + +import elasticsearch from elasticsearch import helpers, Elasticsearch +from glob import glob + # hide ES log es_logger = logging.getLogger("elasticsearch") es_logger.setLevel(logging.ERROR) url_logger = logging.getLogger("urllib3") url_logger.setLevel(logging.ERROR) +pd.options.mode.chained_assignment = None FORMAT = "%(asctime)-15s %(message)s" logging.basicConfig(filename="mans_to_es.log", level=logging.DEBUG, format=FORMAT) -type_name = { +MANS_FIELDS = { "persistence": { "key": "PersistenceItem", - "datefield": [ + "datefields": [ "RegModified", "FileCreated", "FileModified", "FileAccessed", "FileChanged", ], - "message_fields": [ - ["RegPath"], - ["FilePath"], - ["FilePath"], - ["FilePath"], - ["FilePath"], - ], + "message_fields": { + "RegModified": ["RegPath"], + "FileCreated": ["FilePath"], + "FileModified": ["FilePath"], + "FileAccessed": ["FilePath"], + "FileChanged": ["FilePath"], + }, "dateformat": "%Y-%m-%dT%H:%M:%SZ", }, "processes-api": { "key": "ProcessItem", - "datefield": ["startTime"], + "datefields": ["startTime"], "dateformat": "%Y-%m-%dT%H:%M:%SZ", - "message_fields": [["name"]], + "message_fields": {"startTime": ["name"]}, }, "processes-memory": { "key": "ProcessItem", - "datefield": ["startTime"], + "datefields": ["startTime"], "dateformat": "%Y-%m-%dT%H:%M:%SZ", - "message_fields": [["name"]], + "message_fields": {"startTime": ["name"]}, }, "urlhistory": { "key": "UrlHistoryItem", - "datefield": ["LastVisitDate"], + "datefields": ["LastVisitDate"], "dateformat": "%Y-%m-%dT%H:%M:%SZ", - "message_fields": [["URL"]], + "message_fields": {"LastVisitDate": ["URL"]}, }, "stateagentinspector": { "key": "eventItem", - "datefield": ["timestamp"], + "datefields": ["timestamp"], "dateformat": "%Y-%m-%dT%H:%M:%S.%fZ", - "subtypes": { - "addressNotificationEvent": { - "meta": ["message", "address", "datetime", "timestamp_desc"], - "message_fields": ["address"], - }, - "regKeyEvent": { - "meta": [ - "message", - "hive", - "keyPath", - "path", - "pid", - "process", - "processPath", - "username", - "datetime", - "timestamp_desc", - ], - "message_fields": ["keyPath"], - }, + "SubEventType": { + "addressNotificationEvent": {"message_fields": ["address"]}, + "regKeyEvent": {"message_fields": ["keyPath"]}, "ipv4NetworkEvent": { - "meta": [ - "message", - "localIP", - "localPort", - "pid", - "process", - "processPath", - "protocol", - "remoteIP", - "remotePort", - "username", - "datetime", - "timestamp_desc", - ], "message_fields": ["localIP", "remoteIP"], "hits_key": "EXC", }, "processEvent": { - "meta": [ - "message", - "md5", - "parentPid", - "parentProcess", - "parentProcessPath", - "pid", - "process", - "processCmdLine", - "processPath", - "startTime", - "username", - "datetime", - "timestamp_desc", - "eventType", - ], "message_fields": ["process", "eventType"], "hits_key": "EXC", }, - "imageLoadEvent": { - "meta": [ - "devicePath", - "drive", - "message", - "fileExtension", - "fileName", - "filePath", - "fullPath", - "pid", - "process", - "processPath", - "username", - "datetime", - "timestamp_desc", - ], - "message_fields": ["fileName"], - }, - "fileWriteEvent": { - "meta": [ - "closed", - "dataAtLowestOffset", - "devicePath", - "drive", - "message", - "fileExtension", - "fileName", - "filePath", - "fullPath", - "lowestFileOffsetSeen", - "md5", - "numBytesSeenWritten", - "pid", - "process", - "processPath", - "size", - "textAtLowestOffset", - "username", - "writes", - "datetime", - "timestamp_desc", - ], - "message_fields": ["fileName"], - "hits_key": "PRE", - }, + "imageLoadEvent": {"message_fields": ["fileName"]}, + "fileWriteEvent": {"message_fields": ["fileName"], "hits_key": "PRE"}, "dnsLookupEvent": { - "meta": [ - "message", - "hostname", - "pid", - "process", - "processPath", - "username", - "datetime", - "timestamp_desc", - ], + "meta": ["hostname", "pid", "process", "processPath", "username"], "message_fields": ["hostname"], }, - "urlMonitorEvent": { - "meta": [ - "message", - "hostname", - "requestUrl", - "urlMethod", - "userAgent", - "httpHeader", - "remoteIpAddress", - "remotePort", - "localPort", - "pid", - "process", - "processPath", - "username", - "datetime", - "timestamp_desc", - ], - "message_fields": ["requestUrl"], - }, + "urlMonitorEvent": {"message_fields": ["requestUrl"]}, }, }, "prefetch": { "key": "PrefetchItem", - "datefield": ["LastRun", "Created"], + "datefields": ["LastRun", "Created"], "dateformat": "%Y-%m-%dT%H:%M:%SZ", - "message_fields": [["ApplicationFileName"], ["ApplicationFileName"]], + "message_fields": { + "LastRun": ["ApplicationFileName"], + "Created": ["ApplicationFileName"], + }, }, "filedownloadhistory": { "key": "FileDownloadHistoryItem", - "datefield": ["LastModifiedDate", "LastAccessedDate", "StartDate", "EndDate"], + "datefields": ["LastModifiedDate", "LastAccessedDate", "StartDate", "EndDate"], + "dateformat": "%Y-%m-%dT%H:%M:%SZ", + "message_fields": { + "LastModifiedDate": ["SourceURL"], + "LastAccessedDate": ["SourceURL"], + "StartDate": ["SourceURL"], + "EndDate": ["SourceURL"], + }, + }, + "files-raw": { + "key": "FileItem", + "datefields": ["Created", "Modified", "Accessed", "Changed"], + "dateformat": "%Y-%m-%dT%H:%M:%SZ", + "message_fields": { + "Created": ["FullPath"], + "Modified": ["FullPath"], + "Accessed": ["FullPath"], + "Changed": ["FullPath"], + }, + }, + "cookiehistory": { + "key": "CookieHistoryItem", + "datefields": ["LastAccessedDate", "ExpirationDate"], + "dateformat": "%Y-%m-%dT%H:%M:%SZ", + "message_fields": { + "LastAccessedDate": ["HostName"], + "ExpirationDate": ["HostName"], + }, + }, + "eventlogs": { + "key": "EventLogItem", + "datefields": ["genTime"], + "dateformat": "%Y-%m-%dT%H:%M:%SZ", + "message_fields": {"genTime": ["EID", "source", "type"]}, + }, + "registry-raw": { + "key": "RegistryItem", + "datefields": ["Modified"], "dateformat": "%Y-%m-%dT%H:%M:%SZ", - "message_fields": [["SourceURL"], ["SourceURL"], ["SourceURL"], ["SourceURL"]], + "message_fields": {"Modified": ["KeyPath"]}, }, "tasks": {"key": "TaskItem", "skip": True}, "ports": {"key": "PortItem", "skip": True}, @@ -221,61 +153,65 @@ "network-dns": {"key": "DnsEntryItem", "skip": True}, "network-route": {"key": "RouteEntryItem", "skip": True}, "network-arp": {"key": "ArpEntryItem", "skip": True}, - "sysinfo": {"skip": True, "key": "SystemInfoItem"}, - "registry-raw": {"key": "RegistryItem", "skip": True}, + "sysinfo": {"key": "SystemInfoItem", "skip": True}, "services": {"key": "ServiceItem", "skip": True}, + "hivelist": {"key": "HiveItem", "skip": True}, + "drivers-modulelist": {"key": "ModuleItem", "skip": True}, + "drivers-signature": {"key": "DriverItem", "skip": True}, + "formhistory": {"key": "FormHistoryItem", "skip": True}, + "kernel-hookdetection": {"key": "HookItem", "skip": True}, } -def output_dict(details): +def convert_both_pandas(argument, offset=0): """ - Output_dict: details column in stateagentinspector df contains all the row info - In: - row: row of stateagentinspector file [could be a dict or a list of dict] - itemtype: stateagentinspector subtype - Out: - the details dict exploded in multiple columns + convert_both_pandas: parse date field and convert to it to proper + in: + argument: object to parse + out: + parsed data """ - detail = details.get("detail", []) - ret_value = {} - - if type(detail) in (collections.OrderedDict, dict): - ret_value[detail["name"]] = detail["value"] - elif type(detail) == list: - for i in detail: - ret_value[i["name"]] = i["value"] - return pd.Series(ret_value) + try: + d = ciso8601.parse_datetime(argument) + d += datetime.timedelta(seconds=offset) + return pd.Series( + [d.isoformat(timespec="seconds"), str(int(d.timestamp() * 1000000))] + ) + except (ValueError, OSError): + logging.warning(f"[MAIN - WARNING] date {str(argument)} not valid") + return pd.Series([None, None]) -def convert_date(argument, date_format="%Y-%m-%dT%H:%M:%S.%fZ"): +def convert_both(argument, offset=0): """ - convert_date: parse date field and convert to es format + convert_both: parse date field and convert to it to proper in: argument: object to parse - date_format: format of the argument field out: parsed data """ try: - d = datetime.datetime.strptime(argument, date_format) - iso_date = d.isoformat(timespec="seconds") - iso_date_new = iso_date + "+00:00" - return iso_date_new - except TypeError: - return None + d = ciso8601.parse_datetime(argument) + d += datetime.timedelta(seconds=offset) + return d.isoformat(timespec="seconds"), str(int(d.timestamp() * 1000000)) + except (ValueError, OSError): + logging.warning(f"[MAIN - WARNING] date {str(argument)} not valid") + return None, None -def convert_timestamp(argument, date_format="%Y-%m-%dT%H:%M:%S.%fZ"): +def convert_skew(offset): """ - convert_timestamp: parse date field and convert to timestamp + convert_skew: return offset for xml file in seconds in: - argument: object to parse - date_format: format of the argument field + offset: skew offset in custom format out: - parsed data + offset in secs or 0 if error """ - d = datetime.datetime.strptime(argument, date_format) - return str(int(d.timestamp() * 1000000)) + try: + return int(offset.replace("PT", "").replace("S", "")) + except: + logging.warning(f"[MAIN - WARNING] problem parsing skew: {offset}") + return 0 class MansToEs: @@ -285,60 +221,85 @@ def __init__(self, args): self.name = args.name self.bulk_size = args.bulk_size self.cpu_count = args.cpu_count - self.es_info = {"host": args.es_host, "port": args.es_port} self.folder_path = self.filename + "__tmp" + self.offset_stateagentinspector = None + self.es_info = {"host": args.es_host, "port": args.es_port} + self.upload_parts = [] self.filelist = {} - self.ioc_alerts = {} + self.ioc_alerts = [] self.exd_alerts = [] + self.generic_items = {} es = Elasticsearch([self.es_info]) if not es.ping(): raise ValueError("Connection failed") - logging.debug( - "Start parsing %s. Push on %s index and %s timeline" - % (args.filename, args.name, args.index) - ) + logging.debug(f"[MAIN] Start parsing {args.filename}.") + logging.debug(f"[MAIN] Pushing on {args.name} index and {args.index} timeline") - def get_hits(self): + def handle_stateagentinspector(self, path, item_detail): """ - Get hit and alert from hits.json file + handle_item: streaming function for xmltodict (stateagentitem) + In: + path: xml item path """ - with open(os.path.join(self.folder_path, "hits.json"), "r") as f: - for x in json.load(f): - if x.get("data", {}).get("key", None): - self.ioc_alerts.setdefault( - x["data"]["key"]["event_type"], [] - ).append(x["data"]["key"]["event_id"]) - elif x.get("data", {}).get("documents", None) or x.get("data", {}).get( - "analysis_details", None - ): - self.exd_alerts.append( - { - "source": x["source"], - "resolution": x["resolution"], - "process id": x["data"]["process_id"], - "process name": x["data"]["process_name"], - "alert_code": "XPL", - "datetime": convert_date( - x["data"]["earliest_detection_time"], - date_format="%Y-%m-%dT%H:%M:%SZ", - ), - "timestamp": convert_timestamp( - x["data"]["earliest_detection_time"], - date_format="%Y-%m-%dT%H:%M:%SZ", - ), - "ALERT": True, - "message": "PID: %s PROCESS: %s" - % (str(x["data"]["process_id"]), x["data"]["process_name"]), - } - ) - if len(self.exd_alerts) > 0: - es = Elasticsearch([self.es_info]) - helpers.bulk( - es, self.exd_alerts, index=self.index, doc_type="generic_event" + item = {} + uid = path[1][1]["uid"] + item["uid"] = uid + item["SubEventType"] = item_detail["eventType"] + # stateagentinspector has only timestamp field and is parsed now! + datetime, timestamp = convert_both( + item_detail["timestamp"], self.offset_stateagentinspector + ) + item["timestamp"] = timestamp + item["datetime"] = datetime + item["datetype"] = "timestamp" + item["message"] = item_detail["eventType"] + if type(item_detail["details"]["detail"]) in (collections.OrderedDict, dict): + x = item_detail["details"]["detail"] + else: + for x in item_detail["details"]["detail"]: + item[x["name"]] = x["value"] + if uid in self.ioc_alerts: + item["source"] = "IOC" + item["resolution"] = "ALERT" + item["ALERT"] = True + item["alert_code"] = ( + MANS_FIELDS["stateagentinspector"]["SubEventType"] + .get(item_detail["eventType"], {}) + .get("hits_key", None), ) - logging.debug("alert collected") + self.generic_items.setdefault(path[1][0], []).append(item) + return True + + def handle_item(self, path, item_detail): + """ + handle_item: streaming function for xmltodict + In: + path: xml item path + item_detail: xml item data + """ + item_detail["message"] = path[1][0] + self.generic_items.setdefault(path[1][0], []).append(item_detail) + return True + + def generate_df(self, file, offset, filetype, stateagentinspector=False): + """ + Generate dataframe from xml file + """ + xmltodict.parse( + file.read(), + item_depth=2, + item_callback=self.handle_stateagentinspector + if stateagentinspector + else self.handle_item, + ) + key_type = MANS_FIELDS[filetype]["key"] + if self.generic_items.get(key_type, []) == []: + return None, False + df = pd.DataFrame(self.generic_items[key_type]) + df["mainEventType"] = filetype + return df, True def extract_mans(self): """ @@ -347,7 +308,14 @@ def extract_mans(self): zip_ref = zipfile.ZipFile(self.filename, "r") zip_ref.extractall(self.folder_path) zip_ref.close() - logging.debug("File extracted in %s" % self.folder_path) + logging.debug(f"[MAIN] Unzip file in {self.folder_path} [✔]") + + def delete_temp_folder(self): + try: + shutil.rmtree(self.folder_path) + logging.debug("[MAIN] temporary folder deleted [✔]") + except: + logging.warning("[MAIN - WARNING] failed to delete temporary folder") def parse_manifest(self): """ @@ -360,176 +328,226 @@ def parse_manifest(self): self.filelist[item["generator"]] = [] for res in item["results"]: if res["type"] == "application/xml": - self.filelist[item["generator"]].append(res["payload"]) - logging.debug("Manifest.json parsed") + if item["generator"] == "stateagentinspector": + self.offset_stateagentinspector = convert_skew( + res["timestamps"][0]["skew"] + ) + self.filelist[item["generator"]].append( + (res["payload"], convert_skew(res["timestamps"][0]["skew"])) + ) + logging.debug("[MAIN] Parsing Manifest.json [✔]") + + def parse_hits(self): + """ + Get hit and alert from hits.json file + """ + if not os.path.exists(os.path.join(self.folder_path, "hits.json")): + logging.debug("[MAIN] Parsing Hits.json [missing]") + else: + with open(os.path.join(self.folder_path, "hits.json"), "r") as f: + for x in json.load(f): + if x.get("data", {}).get("key", None): + event_id = str(x["data"]["key"]["event_id"]) + if event_id not in self.ioc_alerts: + self.ioc_alerts.append(event_id) + elif x.get("data", {}).get("documents", None) or x.get( + "data", {} + ).get("analysis_details", None): + (alert_datetime, alert_timestamp) = convert_both( + x["data"]["earliest_detection_time"], 0 # ?? + ) + self.exd_alerts.append( + { + "source": x["source"], + "resolution": x["resolution"], + "process id": x["data"]["process_id"], + "process name": x["data"]["process_name"], + "alert_code": "XPL", + "datetime": alert_datetime, + "timestamp": alert_timestamp, + "ALERT": True, + "message": "PID: %s PROCESS: %s" + % ( + str(x["data"]["process_id"]), + x["data"]["process_name"], + ), + } + ) + if len(self.exd_alerts) > 0: + es = Elasticsearch([self.es_info]) + helpers.bulk( + es, self.exd_alerts, index=self.index, doc_type="generic_event" + ) + logging.debug( + "[MAIN] Parsing Hits.json - %d alerts found [✔]" + % (len(self.exd_alerts) + len(self.ioc_alerts)) + ) def process(self): """ Process all files contained in .mans extracted folder """ + files_list = [] for filetype in self.filelist.keys(): - # If filetype is new for now it's skipped - if filetype not in type_name.keys(): - logging.debug( - "Filetype: %s not recognize. Send us a note! - SKIPPED" - % type_name[filetype]["key"] + if filetype not in MANS_FIELDS.keys(): + logging.warning( + f"[MAIN] {filetype} filetype not recognize. Send us a note! - SKIP" ) continue - # Ignore items if not related to timeline # TODO: will use them in neo4j for relationship - if type_name[filetype].get("skip", False): - logging.debug("Filetype: %s - SKIPPED" % type_name[filetype]["key"]) + if MANS_FIELDS[filetype].get("skip", False): + logging.debug(f"[MAIN] SKIP {filetype}") continue - logging.debug("Filetype: %s - START" % type_name[filetype]["key"]) - # Read all files related to the type - for file in self.filelist[filetype]: + for (file, offset) in self.filelist[filetype]: + files_list.append((filetype, file, offset)) - logging.debug("Opening %s [%s]" % (file, type_name[filetype]["key"])) + with Pool(processes=self.cpu_count) as pool: + res = pool.starmap_async(self.process_file, files_list).get() + logging.debug("[MAIN] Pre-Processing [✔]") - with open( - os.path.join(self.folder_path, file), "r", encoding="utf8" - ) as f: - df_xml = ( - xmltodict.parse(f.read()) - .get("itemList", {}) - .get(type_name[filetype]["key"], {}) - ) - if df_xml == {}: - logging.debug("\tEmpty file - SKIPPED") - continue - df = pd.DataFrame(df_xml) - - # check all date field, if not present remove them, if all not valid skip - datefields = [ - x for x in type_name[filetype]["datefield"] if x in df.columns - ] - if len(datefields) == 0: + def process_file(self, filetype, file, offset): + """ + process_file: parse xml to df and clean it + In: + filetype: filetype of the xml + file: xml file pointer + offset: offset to add to date fields + """ + info = MANS_FIELDS[filetype] + + logging.debug(f"[{filetype:<20} {file}] df [ ] - date [ ] - message [ ]") + df, valid = self.generate_df( + open(os.path.join(self.folder_path, file), "r", encoding="utf8"), + offset, + filetype, + filetype == "stateagentinspector", + ) + + if not valid: + logging.error(f"[{filetype:<20} {file}] -- XML not valid or empty") + return + + # check all date field, if not present remove them, if all not valid skip + datefields = [x for x in info["datefields"] if x in df.columns] + if len(datefields) == 0: + logging.debug(f"[{filetype:<20} {file}] No valid time field - SKIP") + return + + if filetype == "stateagentinspector": + # each subtype has different fields and message fields + subevents = df["SubEventType"].unique() + # logging.debug(f"[{filetype:<20} {file}] contains followings subtypes: {subevents}") + for sb in subevents: + + # if it's new we cannot continue + if sb not in info["SubEventType"].keys(): logging.debug( - "Filetype: %s has no valid time field - SKIPPED" - % type_name[filetype]["key"] + f"[{filetype:<20} {sb:<24} {file}] -- new subtype found. Send us a note!" ) continue - # if not valid date field drop them - df = df.dropna(axis=0, how="all", subset=datefields) - - # stateagentinspector have in eventType the main subtype and in timestamp usually the relative time - if filetype == "stateagentinspector": - df = df.rename(columns={"eventType": "message"}) - df["datetime"] = df["timestamp"].apply(lambda x: convert_date(x)) - df["timestamp"] = df["datetime"].apply( - lambda x: convert_timestamp( - x, date_format="%Y-%m-%dT%H:%M:%S+00:00" + # take only valid column for that subtype + subdf = df[df["SubEventType"] == sb] + + # add messages based on selected fields value + if info["SubEventType"][sb].get("message_fields", None): + subdf.loc[:, "message"] = subdf.apply( + lambda row: " - ".join( + [row["message"]] + + [ + str(row[mf]) + for mf in info["SubEventType"][sb]["message_fields"] + if row.get(mf, None) + ] ) + + " [%s]" % row["datetype"], + axis=1, ) else: - df["message"] = filetype - # convert all export date fields to default format - for datefield in datefields: - df[datefield] = df[datefield].apply( - lambda x: convert_date(x, type_name[filetype]["dateformat"]) - ) - df = df.drop(["@created", "@sequence_num"], axis=1, errors="ignore") - logging.debug("\tPreprocessing done") - - # stateagentinspector is big and converted in parallel - if filetype == "stateagentinspector": - pieces = [] - - for itemtype in [x for x in type_name[filetype]["subtypes"].keys()]: - tmp_df = df[df.message == itemtype].reset_index() - for i in range(0, len(tmp_df), self.bulk_size): - pieces.append( - (tmp_df.loc[i: i + self.bulk_size - 1, :], itemtype) - ) - with Pool(processes=self.cpu_count) as pool: - pool.starmap_async( - self.explode_stateagentinspector, pieces - ).get() - else: - # explode if multiple date - list_dd = [] - df_dd = pd.DataFrame() - df_tmp = df[[x for x in df.columns if x not in datefields]] - for index, x in enumerate(datefields): - df_tmp2 = df_tmp.copy() - df_tmp2["datetime"] = df[[x]] - df_tmp2 = df_tmp2[df_tmp2["datetime"].notnull()] - df_tmp2["timestamp"] = df_tmp2["datetime"].apply( - lambda x: convert_timestamp( - x, date_format="%Y-%m-%dT%H:%M:%S+00:00" - ) - ) - if type_name[filetype].get("message_fields", None): - for mf in type_name[filetype]["message_fields"][index]: - df_tmp2["message"] += " - " + df_tmp2[mf] - df_tmp2["message"] += " [%s]" % x - list_dd.append(df_tmp2) - df_dd = pd.concat(list_dd, sort=False) - - df_dd["timestamp_desc"] = df_dd["message"] - self.to_elastic(df_dd) - logging.debug("\tUpload done") - logging.debug("completed") - - def explode_stateagentinspector(self, edf, itemtype): - """ - explode_stateagentinspector: parse stateagentinspector file - In: - edf: piece of stateagentinspector file - itemtype: subtype of processes piece - """ - subtype = type_name["stateagentinspector"]["subtypes"] + subdf.loc[:, "message"] = subdf.apply( + lambda row: row["message"] + " [%s]" % row["datetype"], axis=1 + ) + subdf.loc[:, "timestamp_desc"] = subdf.loc[:, "message"] + logging.debug( + f"[{filetype:<20} {sb:<24} {file}] df [✔] - date [✔] - message [✔]" + ) + subdf.dropna(axis=1, how="all").to_json( + os.path.join(self.folder_path, f"tmp___{sb}_{file}.json"), + orient="records", + lines=True, + ) + else: + logging.debug(f"[{filetype:<20} {file}] df [✔] - date [ ] - message [ ]") + # melt multiple date fields + if len(datefields) > 1: + df = df.melt( + id_vars=[x for x in df.columns if x not in datefields], + var_name="datetype", + value_name="datetime", + ) + else: + df["datetype"] = datefields[0] + df = df.rename(columns={datefields[0]: "datetime"}) + df = df[df["datetime"].notnull()] + + # convert datetime to default format + df[["datetime", "timestamp"]] = df["datetime"].apply( + lambda x: convert_both_pandas(x, offset) + ) - end = pd.concat( - [ - edf, - edf.apply( - lambda row: output_dict(row.details), + logging.debug(f"[{filetype:<20} {file}] df [✔] - date [✔] - message [ ]") + + # Add messages based on selected fields value + if info.get("message_fields", None): + df.loc[:, "message"] = df.apply( + lambda row: " - ".join( + [row["message"]] + + [ + str(row[mf]) + for mf in info["message_fields"][row["datetype"]] + if row.get(mf, None) + ] + ) + + " [%s]" % row["datetype"], axis=1, - result_type="expand", - ), - ], - axis=1, - ) - - end[["source", "resolution", "ALERT", "alert_code"]] = end["@uid"].apply( - lambda x: pd.Series( - { - "source": "IOC", - "resolution": "ALERT", - "ALERT": True, - "alert_code": subtype[itemtype].get("hits_key", None), - } - ) - if itemtype in self.ioc_alerts.keys() - and int(x) in self.ioc_alerts[itemtype] - else pd.Series( - {"source": None, "resolution": None, "ALERT": None, "alert_code": None} + ) + else: + df.loc[:, "message"] = df.apply( + lambda row: row["message"] + " [%s]" % row["datetype"], axis=1 + ) + df.loc[:, "timestamp_desc"] = df.loc[:, "message"] + logging.debug(f"[{filetype:<20} {file}] df [✔] - date [✔] - message [✔]") + df.dropna(axis=1, how="all").to_json( + os.path.join(self.folder_path, f"tmp___{file}.json"), + orient="records", + lines=True, ) - ) + del df - end = end.drop(["details"], axis=1, errors="ignore") - if subtype[itemtype].get("message_fields", None): - for mf in subtype[itemtype]["message_fields"]: - end["message"] += " - " + end[mf] - end["timestamp_desc"] = end["message"] - self.to_elastic(end) - logging.debug("\t\tUpload part - done") - - def to_elastic(self, end): + def to_elastic(self): """ to_elastic: push dataframe to elastic index - In: - end: dataframe to push """ + elk_items = [] + for file in glob(self.folder_path + "/tmp__*.json"): + elk_items += open(file, "r").readlines() + logging.debug(f"[MAIN] Pushing {len(elk_items)} items to elastic") es = Elasticsearch([self.es_info]) - data = end.to_json(orient="records") - data = json.loads(data) - helpers.bulk(es, data, index=self.index, doc_type="generic_event") + collections.deque( + helpers.parallel_bulk( + es, + elk_items, + index=self.index, + doc_type="generic_event", + chunk_size=self.bulk_size, + request_timeout=60, + ), + maxlen=0, + ) + logging.debug("[MAIN] Parallel elastic push [✔]") def main(): @@ -560,7 +578,7 @@ def main(): ) parser.add_argument( - "--version", dest="version", action="version", version="%(prog)s 1.3" + "--version", dest="version", action="version", version="%(prog)s 1.4" ) args = parser.parse_args() @@ -571,8 +589,11 @@ def main(): mte = MansToEs(args) mte.extract_mans() mte.parse_manifest() - mte.get_hits() + mte.parse_hits() mte.process() + mte.to_elastic() + mte.delete_temp_folder() + logging.debug("[MAIN] Operation Completed [✔✔✔]") except: logging.exception("Error parsing .mans") return False