diff --git a/models/Configuration.md b/models/Configuration.md new file mode 100644 index 0000000..7ba508a --- /dev/null +++ b/models/Configuration.md @@ -0,0 +1,46 @@ +## Configuration + +All configurations in URI Drain are done using `uri_drain.ini` file. [Here is a specific demo](../servers/simple/uri_drain.ini). + +### Snapshot + +Snapshot is used to serialize and store the analysis results that have been saved in the current system. +Currently, it supports saving snapshots to the file system. + +| Name | Type(Unit) | Default | Description | +|---------------------------|-------------|---------|-------------------------------------------------------------------------------------------| +| file_dir | string | /tmp/ | The directory to save the snapshot, the persistent would disable when the value is empty. | +| snapshot_interval_minutes | int(minute) | 10 | The interval to save the snapshot. | +| compress_state | bool | True | Whether to compress the snapshot through zlib with base64. | + +### Masking + +When aggregation methods are detected, Masking determines how to generate the aggregation information. + +Currently, all similar content is replaced with `{var}` by default. + +| Name | Type(Unit) | Default | Description | +|-------------|------------|---------|-----------------------------------| +| mask_prefix | string | { | The prefix to mask the parameter. | +| mask_suffix | string | } | The suffix to mask the parameter. | + +### Drain + +Drain is the core algorithm of URI Drain. + +| Name | Type(Unit) | Default | Description | +|------------------|------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sim_th | float | 0.4 | The similarity threshold to decide if a new sequence should be merged into an existing cluster. | +| depth | int | 4 | Max depth levels of pattern. Minimum is 2. | +| max_children | int | 100 | Max number of children of an internal node. | +| max_clusters | int | 1024 | Max number of tracked clusters (unlimited by default). When this number is reached, model starts replacing old clusters with a new ones according to the LRU policy. | +| extra_delimiters | string | / | The extra delimiters to split the sequence. | + +### Profiling + +Profiling is used to enable the profiling of the algorithm. + +| Name | Type(Unit) | Default | Description | +|------------|-------------|---------|---------------------------------------------------| +| enabled | bool | False | Whether to enable the profiling. | +| report_sec | int(second) | 30 | The interval to report the profiling information. | \ No newline at end of file diff --git a/models/README.md b/models/README.md index fe9c47d..caf3ec8 100644 --- a/models/README.md +++ b/models/README.md @@ -6,6 +6,8 @@ algorithm. The original paper of Drain can be found [here](https://jiemingzhu.github.io/pub/pjhe_icws2017.pdf) and [here](https://arxiv.org/pdf/1806.04356.pdf). +The configuration please refer to [Configuration Documentation](./Configuration.md). + #### Upstream Drain3 version - Currently diff --git a/models/uri_drain/persistence_handler.py b/models/uri_drain/persistence_handler.py index 86e30ef..6888715 100644 --- a/models/uri_drain/persistence_handler.py +++ b/models/uri_drain/persistence_handler.py @@ -1,6 +1,8 @@ # SPDX-License-Identifier: MIT - +import base64 +import os from abc import ABC, abstractmethod +from pathlib import Path class PersistenceHandler(ABC): @@ -12,3 +14,34 @@ def save_state(self, state): @abstractmethod def load_state(self): pass + + +class ServiceFilePersistenceHandler(PersistenceHandler): + + def __init__(self, base_dir, service): + self.file_path = os.path.join(base_dir, 'services', base64.b64encode(service.encode('utf-8')).decode('utf-8')) + path = Path(self.file_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.touch(exist_ok=True) + + def save_state(self, state): + with open(self.file_path, 'wb') as file: + file.write(state) + + def load_state(self): + with open(self.file_path, 'rb') as file: + return file.read() + + +class ServicePersistentLoader: + + def __init__(self, base_dir): + self.file_path = os.path.join(base_dir, 'services') + + def load_services(self): + services = [] + if os.path.isdir(self.file_path): + for entry in os.listdir(self.file_path): + if os.path.isfile(os.path.join(self.file_path, entry)): + services.append(base64.b64decode(entry.encode('utf-8')).decode('utf-8')) + return services diff --git a/models/uri_drain/template_miner.py b/models/uri_drain/template_miner.py index 77562fe..e49f854 100644 --- a/models/uri_drain/template_miner.py +++ b/models/uri_drain/template_miner.py @@ -5,17 +5,19 @@ import re import time import zlib +from collections import defaultdict from typing import Optional, List, NamedTuple import jsonpickle from cachetools import LRUCache, cachedmethod -from models.uri_drain.uri_drain import Drain, LogCluster # from drain3.jaccard_drain import JaccardDrain # MODIFIED:: NOT USED AT ALL from models.uri_drain.masking import LogMasker -from models.uri_drain.persistence_handler import PersistenceHandler -from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler +from models.uri_drain.persistence_handler import PersistenceHandler, ServicePersistentLoader, \ + ServiceFilePersistenceHandler from models.uri_drain.template_miner_config import TemplateMinerConfig +from models.uri_drain.uri_drain import Drain, LogCluster +from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler logger = logging.getLogger(__name__) @@ -24,6 +26,18 @@ ExtractedParameter = NamedTuple("ExtractedParameter", [("value", str), ("mask_name", str)]) +def load_existing_miners(config: TemplateMinerConfig = None): + if config.snapshot_file_dir is None: + return + existing_services = ServicePersistentLoader(config.snapshot_file_dir).load_services() + miners = defaultdict(TemplateMiner) + if len(existing_services) > 0: + print(f'Detected {len(existing_services)} services from disk') + for service in existing_services: + miners[service] = TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, service), config) + return miners + + class TemplateMiner: def __init__(self, @@ -87,7 +101,7 @@ def load_state(self): logger.info("Checking for saved state") state = self.persistence_handler.load_state() - if state is None: + if state is None or state == b'': logger.info("Saved state not found") return diff --git a/models/uri_drain/template_miner_config.py b/models/uri_drain/template_miner_config.py index 708d049..6b0a65c 100644 --- a/models/uri_drain/template_miner_config.py +++ b/models/uri_drain/template_miner_config.py @@ -17,6 +17,7 @@ def __init__(self): self.profiling_report_sec = 60 self.snapshot_interval_minutes = 5 self.snapshot_compress_state = True + self.snapshot_file_dir = None self.drain_extra_delimiters = [] self.drain_sim_th = 0.4 self.drain_depth = 4 @@ -50,6 +51,9 @@ def load(self, config_filename: str): fallback=self.snapshot_interval_minutes) self.snapshot_compress_state = parser.getboolean(section_snapshot, 'compress_state', fallback=self.snapshot_compress_state) + file_path = parser.get(section_snapshot, 'file_path', fallback=None) + if file_path: + self.snapshot_file_dir = file_path drain_extra_delimiters_str = parser.get(section_drain, 'extra_delimiters', fallback=str(self.drain_extra_delimiters)) diff --git a/models/uri_drain/uri_drain.py b/models/uri_drain/uri_drain.py index 37c377f..055cf71 100644 --- a/models/uri_drain/uri_drain.py +++ b/models/uri_drain/uri_drain.py @@ -135,6 +135,11 @@ def __init__(self, def clusters(self): return self.id_to_cluster.values() + @property + def cluster_patterns(self): + sorted_drain_clusters = sorted(self.clusters, key=lambda it: it.size, reverse=True) + return [cluster.get_template() for cluster in sorted_drain_clusters] + @staticmethod def has_numbers(s): return any(char.isdigit() for char in s) diff --git a/servers/simple/run.py b/servers/simple/run.py index c400ac9..1b480ad 100644 --- a/servers/simple/run.py +++ b/servers/simple/run.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import multiprocessing +import os +from os.path import dirname -from servers.simple.worker import run_worker -from servers.simple.server import run_server +from models.uri_drain.template_miner import load_existing_miners +from models.uri_drain.template_miner_config import TemplateMinerConfig from servers.simple.results_manager import ProxyURIDrainResultsManager, URIDrainResults +from servers.simple.server import run_server +from servers.simple.worker import run_worker def run(): @@ -25,12 +29,23 @@ def run(): manager = ProxyURIDrainResultsManager() manager.start() + # Load config + config = TemplateMinerConfig() + config_file = os.path.join(dirname(__file__), "uri_drain.ini") + print(f'Searching for config file at {config_file}') + config.load(config_filename=config_file) # change to config injection from env or other + # SET DEBUG HERE! < TODO CONFIG FILE shared_results_object = manager.URIDrainResults(debug=False) # noqa uri_main_queue = multiprocessing.Queue() + # Load existing miner and clusters + miners = load_existing_miners(config) + for service in miners: + shared_results_object.set_dict_field(service=service, value=miners[service].drain.cluster_patterns) + producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object)) - consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object)) + consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object, config, miners)) producer_process.start() consumer_process.start() diff --git a/servers/simple/uri_drain.ini b/servers/simple/uri_drain.ini index 83913fb..09643a9 100644 --- a/servers/simple/uri_drain.ini +++ b/servers/simple/uri_drain.ini @@ -13,6 +13,7 @@ # limitations under the License. [SNAPSHOT] +file_path = /tmp/ snapshot_interval_minutes = 10 compress_state = True @@ -36,4 +37,4 @@ extra_delimiters = ["/"] [PROFILING] enabled = False -report_sec = 30 \ No newline at end of file +report_sec = 30 diff --git a/servers/simple/worker.py b/servers/simple/worker.py index 9ce0435..e3057f2 100644 --- a/servers/simple/worker.py +++ b/servers/simple/worker.py @@ -11,23 +11,29 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import functools -import time import queue -import os +import time from collections import defaultdict -from os.path import dirname -from models.uri_drain.template_miner_config import TemplateMinerConfig +from models.uri_drain.persistence_handler import ServiceFilePersistenceHandler from models.uri_drain.template_miner import TemplateMiner -def run_worker(uri_main_queue, shared_results_object): - config = TemplateMinerConfig() - config_file = os.path.join(dirname(__file__), "uri_drain.ini") - print(f'Searching for config file at {config_file}') - config.load(config_filename=config_file) # change to config injection from env or other - drain_instances = defaultdict(functools.partial(TemplateMiner, None, config)) # URIDrain instances +def create_defaultdict_with_key(factory): + class CustomDefaultDict(defaultdict): + def __missing__(self, key): + value = factory(key) + self[key] = value + return value + + return CustomDefaultDict(lambda key: factory(key)) + + +def run_worker(uri_main_queue, shared_results_object, config, existing_miners): + drain_instances = create_defaultdict_with_key(lambda key: # URIDrain instances + TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, key) if config.snapshot_file_dir else None, config)) + for service in existing_miners: + drain_instances[service] = existing_miners[service] counter = 0 while True: @@ -44,11 +50,8 @@ def run_worker(uri_main_queue, shared_results_object): for uri in uris: drain_instances[service].add_log_message(uri) print(f'Processed {len(uris)} uris in {time.time() - start_time} seconds') - drain_clusters = drain_instances[service].drain.clusters - sorted_drain_clusters = sorted(drain_clusters, key=lambda it: it.size, reverse=True) - - drain_clusters_templates = [cluster.get_template() for cluster in sorted_drain_clusters] - shared_results_object.set_dict_field(service=service, value=drain_clusters_templates) # TODO add version + patterns = drain_instances[service].drain.cluster_patterns + shared_results_object.set_dict_field(service=service, value=patterns) # TODO add version # increment here counter += 1 print('-================-')