diff --git a/logger.py b/logger.py index d48f971..614ef75 100644 --- a/logger.py +++ b/logger.py @@ -22,7 +22,7 @@ def get_logger(name=None): # noqa _logger = logging.getLogger(name) ch = logging.StreamHandler() - formatter = logging.Formatter('%(name)s [%(levelname)s] %(message)s') + formatter = logging.Formatter('%(asctime)s - %(name)s - Thread-%(process)d - [%(levelname)s] %(message)s') ch.setFormatter(formatter) _logger.addHandler(ch) _logger.propagate = False diff --git a/models/uri_drain/persistence_handler.py b/models/uri_drain/persistence_handler.py index 6888715..95ba459 100644 --- a/models/uri_drain/persistence_handler.py +++ b/models/uri_drain/persistence_handler.py @@ -15,10 +15,15 @@ def save_state(self, state): def load_state(self): pass + @abstractmethod + def get_service(self): + pass + class ServiceFilePersistenceHandler(PersistenceHandler): def __init__(self, base_dir, service): + self.service_name = service self.file_path = os.path.join(base_dir, 'services', base64.b64encode(service.encode('utf-8')).decode('utf-8')) path = Path(self.file_path) path.parent.mkdir(parents=True, exist_ok=True) @@ -32,6 +37,9 @@ def load_state(self): with open(self.file_path, 'rb') as file: return file.read() + def get_service(self): + return self.service_name + class ServicePersistentLoader: diff --git a/models/uri_drain/template_miner.py b/models/uri_drain/template_miner.py index 33683dc..6f96362 100644 --- a/models/uri_drain/template_miner.py +++ b/models/uri_drain/template_miner.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import base64 -import logging +import logger import re import time import zlib @@ -19,7 +19,7 @@ from models.uri_drain.uri_drain import Drain, LogCluster from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler -logger = logging.getLogger(__name__) +logger = logger.init_logger(name=__name__) config_filename = 'drain3.ini' @@ -32,7 +32,7 @@ def load_existing_miners(config: TemplateMinerConfig = None): existing_services = ServicePersistentLoader(config.snapshot_file_dir).load_services() miners = defaultdict(TemplateMiner) if len(existing_services) > 0: - print(f'Detected {len(existing_services)} services from disk') + logger.info(f'Detected {len(existing_services)} services from disk') for service in existing_services: miners[service] = TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, service), config) return miners @@ -48,7 +48,10 @@ def __init__(self, :param persistence_handler: The type of persistence to use. When None, no persistence is applied. :param config: Configuration object. When none, configuration is loaded from default .ini file (if exist) """ - logger.info("Starting Drain3 template miner") + service = "" + if persistence_handler is not None: + service = persistence_handler.get_service() + logger.info(f"Starting Drain3 template miner of service {service}") if config is None: logger.info(f"Loading configuration from {config_filename}") @@ -99,11 +102,11 @@ def __init__(self, self.load_state() def load_state(self): - logger.info("Checking for saved state") + logger.info(f"Checking for saved state of service {self.persistence_handler.get_service()}") state = self.persistence_handler.load_state() if state is None or state == b'': - logger.info("Saved state not found") + logger.info(f"Saved state not found of service {self.persistence_handler.get_service()}") return if self.config.snapshot_compress_state: diff --git a/servers/simple/run.py b/servers/simple/run.py index 462442d..483db63 100644 --- a/servers/simple/run.py +++ b/servers/simple/run.py @@ -14,6 +14,7 @@ import multiprocessing import os from os.path import dirname +import logger from models.uri_drain.template_miner import load_existing_miners from models.uri_drain.template_miner_config import TemplateMinerConfig @@ -21,9 +22,11 @@ from servers.simple.server import run_server from servers.simple.worker import run_worker +logger = logger.init_logger(logging_level='INFO', name=__name__) + def run(): - print('Starting server from entrypoint...') + logger.info('Starting server from entrypoint...') ProxyURIDrainResultsManager.register("URIDrainResults", URIDrainResults) manager = ProxyURIDrainResultsManager() @@ -32,7 +35,7 @@ def run(): # Load config config = TemplateMinerConfig() config_file = os.path.join(dirname(__file__), "uri_drain.ini") - print(f'Searching for config file at {config_file}') + logger.info(f'Searching for config file at {config_file}') config.load(config_filename=config_file) # change to config injection from env or other # SET DEBUG HERE! < TODO CONFIG FILE @@ -45,7 +48,8 @@ def run(): shared_results_object.set_dict_field(service=service, value=miners[service].drain.cluster_patterns) producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object, config)) - consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object, config, miners)) + consumer_process = multiprocessing.Process(target=run_worker, + args=(uri_main_queue, shared_results_object, config, miners)) producer_process.start() consumer_process.start() diff --git a/servers/simple/server.py b/servers/simple/server.py index 1ac8884..3a3527e 100644 --- a/servers/simple/server.py +++ b/servers/simple/server.py @@ -18,12 +18,15 @@ from concurrent import futures import grpc +import logger from google.protobuf.empty_pb2 import Empty from servers.protos.generated import ai_http_uri_recognition_pb2 from servers.protos.generated import ai_http_uri_recognition_pb2_grpc from servers.protos.generated.ai_http_uri_recognition_pb2 import Pattern +logger = logger.init_logger(name=__name__) + class HttpUriRecognitionServicer(ai_http_uri_recognition_pb2_grpc.HttpUriRecognitionServiceServicer): """ @@ -42,8 +45,7 @@ def __init__(self, uri_main_queue, shared_results_object, conf): async def fetchAllPatterns(self, request, context): # TODO OAP SIDE OR THIS SIDE must save the version, e.g. oap should check if version is > got version, since # this is a stateful service and it may crash and restart - print('-================-') - print( + logger.info( f'> Received fetchAllPatterns request for service <{request.service}>, ' f'oap side version is: {request.version}') @@ -54,21 +56,20 @@ async def fetchAllPatterns(self, request, context): # https://github.com/apache/skywalking/blob/master/oap-server/ai-pipeline/src/main/java/org # /apache/skywalking/oap/server/ai/pipeline/services/HttpUriRecognitionService.java#LL39C32-L39C32 if version == request.version: # Initial version is NULL - print('Version match, returning empty response') + logger.info('Version match, returning empty response') return ai_http_uri_recognition_pb2.HttpUriRecognitionResponse(patterns=[], version=version) - print(f'Version do not match, local:{version} vs oap:{request.version}') + logger.info(f'Version do not match, local:{version} vs oap:{request.version}') cluster_candidates = self.shared_results_object.get_dict_field(request.service) patterns = [] + count = 0 for cluster in cluster_candidates: if '{var}' in cluster: patterns.append(Pattern(pattern=cluster)) else: # TODO this is for post processing feature to be added - print("Skipping pattern without {var}, OAP won't need this") - print(f'Returning {len(patterns)} patterns') - - print('-================-') + count += 1 + logger.info(f'Returning {len(patterns)} patterns, ignore {count} patterns without var urls') return ai_http_uri_recognition_pb2.HttpUriRecognitionResponse(patterns=patterns, version=version) @@ -78,7 +79,7 @@ async def feedRawData(self, request, context): There will always be a User service, its in topology, but it will not call fetchAllPatterns """ - print(f'> Received feedRawData request for service {request.service}') + logger.info(f'> Received feedRawData request for service {request.service}') if request.service == 'User': # It should not be called return Empty() @@ -89,7 +90,8 @@ async def feedRawData(self, request, context): # This is an experimental mechanism to avoid identifying non-restful uris unnecessarily. self.known_services[service] += len(set(uris)) if self.known_services[service] < self.conf.drain_analysis_min_url_count: - print(f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping') + logger.info( + f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping') return Empty() self.uri_main_queue.put((uris, service)) return Empty() @@ -99,13 +101,14 @@ async def serve(uri_main_queue, shared_results_object, conf): server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10)) ai_http_uri_recognition_pb2_grpc.add_HttpUriRecognitionServiceServicer_to_server( - HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object, conf=conf), server) + HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object, + conf=conf), server) server.add_insecure_port('[::]:17128') # TODO: change to config injection await server.start() - print('Server started!') + logger.info('Server started at :17128!') await server.wait_for_termination() # timeout=5 @@ -118,7 +121,7 @@ def run_server(uri_main_queue, shared_results_object, conf): sys.exit(loop.run_until_complete(serve(uri_main_queue, shared_results_object, conf))) except KeyboardInterrupt: # Optionally show a message if the shutdown may take a while - print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True) + logger.info("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True) quit() # TODO Handle interrupt and gracefully shutdown @@ -126,6 +129,7 @@ def run_server(uri_main_queue, shared_results_object, conf): Learn from this https://stackoverflow.com/questions/30765606/whats-the-correct-way-to-clean-up-after-an-interrupted-event-loop """ + # Do not show `asyncio.CancelledError` exceptions during shutdown # (a lot of these may be generated, skip this if you prefer to see them) def shutdown_exception_handler(loop, context): diff --git a/servers/simple/worker.py b/servers/simple/worker.py index e3057f2..10c0c7a 100644 --- a/servers/simple/worker.py +++ b/servers/simple/worker.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logger import queue import time from collections import defaultdict @@ -18,6 +19,8 @@ from models.uri_drain.persistence_handler import ServiceFilePersistenceHandler from models.uri_drain.template_miner import TemplateMiner +logger = logger.init_logger(name=__name__) + def create_defaultdict_with_key(factory): class CustomDefaultDict(defaultdict): @@ -31,7 +34,9 @@ def __missing__(self, key): def run_worker(uri_main_queue, shared_results_object, config, existing_miners): drain_instances = create_defaultdict_with_key(lambda key: # URIDrain instances - TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, key) if config.snapshot_file_dir else None, config)) + TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, + key) if config.snapshot_file_dir else None, + config)) for service in existing_miners: drain_instances[service] = existing_miners[service] @@ -39,23 +44,20 @@ def run_worker(uri_main_queue, shared_results_object, config, existing_miners): while True: try: uri_package = uri_main_queue.get() - print('====================') - print(f'currently have drain instances for {len(drain_instances)} services') - print(f'drain_instances.keys() = {drain_instances.keys()}') - print('-================-') + logger.info( + f'currently have drain instances for {len(drain_instances)} services, drain_instances.keys() = {drain_instances.keys()}, ' + f'got uri package of length {len(uri_package[0])} for service <{uri_package[1]}>') uris, service = uri_package[0], uri_package[1] # print(uri_main_queue.get(timeout=1)) - print(f'Got uri package of length {len(uri_package[0])} for service <{uri_package[1]}>') start_time = time.time() for uri in uris: drain_instances[service].add_log_message(uri) - print(f'Processed {len(uris)} uris in {time.time() - start_time} seconds') + logger.info(f'Processed {len(uris)} uris of service {service} in {time.time() - start_time} seconds') patterns = drain_instances[service].drain.cluster_patterns shared_results_object.set_dict_field(service=service, value=patterns) # TODO add version # increment here counter += 1 - print('-================-') except Exception as e: - print(f"catch an unexpected error occurred: {e}") + logger.error(f"catch an unexpected error occurred: {e}") except queue.Empty: # TODO Consider queue full pass