Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce logging framework to replace system print #18

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
def get_logger(name=None): # noqa
_logger = logging.getLogger(name)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(name)s [%(levelname)s] %(message)s')
formatter = logging.Formatter('%(asctime)s - %(name)s - Thread-%(process)d - [%(levelname)s] %(message)s')
ch.setFormatter(formatter)
_logger.addHandler(ch)
_logger.propagate = False
Expand Down
8 changes: 8 additions & 0 deletions models/uri_drain/persistence_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ def save_state(self, state):
def load_state(self):
pass

@abstractmethod
def get_service(self):
pass


class ServiceFilePersistenceHandler(PersistenceHandler):

def __init__(self, base_dir, service):
self.service_name = service
self.file_path = os.path.join(base_dir, 'services', base64.b64encode(service.encode('utf-8')).decode('utf-8'))
path = Path(self.file_path)
path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -32,6 +37,9 @@ def load_state(self):
with open(self.file_path, 'rb') as file:
return file.read()

def get_service(self):
return self.service_name


class ServicePersistentLoader:

Expand Down
15 changes: 9 additions & 6 deletions models/uri_drain/template_miner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT

import base64
import logging
import logger
import re
import time
import zlib
Expand All @@ -19,7 +19,7 @@
from models.uri_drain.uri_drain import Drain, LogCluster
from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler

logger = logging.getLogger(__name__)
logger = logger.init_logger(name=__name__)

config_filename = 'drain3.ini'

Expand All @@ -32,7 +32,7 @@ def load_existing_miners(config: TemplateMinerConfig = None):
existing_services = ServicePersistentLoader(config.snapshot_file_dir).load_services()
miners = defaultdict(TemplateMiner)
if len(existing_services) > 0:
print(f'Detected {len(existing_services)} services from disk')
logger.info(f'Detected {len(existing_services)} services from disk')
for service in existing_services:
miners[service] = TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, service), config)
return miners
Expand All @@ -48,7 +48,10 @@ def __init__(self,
:param persistence_handler: The type of persistence to use. When None, no persistence is applied.
:param config: Configuration object. When none, configuration is loaded from default .ini file (if exist)
"""
logger.info("Starting Drain3 template miner")
service = ""
if persistence_handler is not None:
service = persistence_handler.get_service()
logger.info(f"Starting Drain3 template miner of service {service}")

if config is None:
logger.info(f"Loading configuration from {config_filename}")
Expand Down Expand Up @@ -99,11 +102,11 @@ def __init__(self,
self.load_state()

def load_state(self):
logger.info("Checking for saved state")
logger.info(f"Checking for saved state of service {self.persistence_handler.get_service()}")

state = self.persistence_handler.load_state()
if state is None or state == b'':
logger.info("Saved state not found")
logger.info(f"Saved state not found of service {self.persistence_handler.get_service()}")
return

if self.config.snapshot_compress_state:
Expand Down
10 changes: 7 additions & 3 deletions servers/simple/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
import multiprocessing
import os
from os.path import dirname
import logger

from models.uri_drain.template_miner import load_existing_miners
from models.uri_drain.template_miner_config import TemplateMinerConfig
from servers.simple.results_manager import ProxyURIDrainResultsManager, URIDrainResults
from servers.simple.server import run_server
from servers.simple.worker import run_worker

logger = logger.init_logger(logging_level='INFO', name=__name__)


def run():
print('Starting server from entrypoint...')
logger.info('Starting server from entrypoint...')
ProxyURIDrainResultsManager.register("URIDrainResults", URIDrainResults)

manager = ProxyURIDrainResultsManager()
Expand All @@ -32,7 +35,7 @@ def run():
# Load config
config = TemplateMinerConfig()
config_file = os.path.join(dirname(__file__), "uri_drain.ini")
print(f'Searching for config file at {config_file}')
logger.info(f'Searching for config file at {config_file}')
config.load(config_filename=config_file) # change to config injection from env or other

# SET DEBUG HERE! < TODO CONFIG FILE
Expand All @@ -45,7 +48,8 @@ def run():
shared_results_object.set_dict_field(service=service, value=miners[service].drain.cluster_patterns)

producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object, config))
consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object, config, miners))
consumer_process = multiprocessing.Process(target=run_worker,
args=(uri_main_queue, shared_results_object, config, miners))

producer_process.start()
consumer_process.start()
Expand Down
30 changes: 17 additions & 13 deletions servers/simple/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
from concurrent import futures

import grpc
import logger
from google.protobuf.empty_pb2 import Empty

from servers.protos.generated import ai_http_uri_recognition_pb2
from servers.protos.generated import ai_http_uri_recognition_pb2_grpc
from servers.protos.generated.ai_http_uri_recognition_pb2 import Pattern

logger = logger.init_logger(name=__name__)


class HttpUriRecognitionServicer(ai_http_uri_recognition_pb2_grpc.HttpUriRecognitionServiceServicer):
"""
Expand All @@ -42,8 +45,7 @@ def __init__(self, uri_main_queue, shared_results_object, conf):
async def fetchAllPatterns(self, request, context):
# TODO OAP SIDE OR THIS SIDE must save the version, e.g. oap should check if version is > got version, since
# this is a stateful service and it may crash and restart
print('-================-')
print(
logger.info(
f'> Received fetchAllPatterns request for service <{request.service}>, '
f'oap side version is: {request.version}')

Expand All @@ -54,21 +56,20 @@ async def fetchAllPatterns(self, request, context):
# https://github.com/apache/skywalking/blob/master/oap-server/ai-pipeline/src/main/java/org
# /apache/skywalking/oap/server/ai/pipeline/services/HttpUriRecognitionService.java#LL39C32-L39C32
if version == request.version: # Initial version is NULL
print('Version match, returning empty response')
logger.info('Version match, returning empty response')
return ai_http_uri_recognition_pb2.HttpUriRecognitionResponse(patterns=[], version=version)

print(f'Version do not match, local:{version} vs oap:{request.version}')
logger.info(f'Version do not match, local:{version} vs oap:{request.version}')

cluster_candidates = self.shared_results_object.get_dict_field(request.service)
patterns = []
count = 0
for cluster in cluster_candidates:
if '{var}' in cluster:
patterns.append(Pattern(pattern=cluster))
else: # TODO this is for post processing feature to be added
print("Skipping pattern without {var}, OAP won't need this")
print(f'Returning {len(patterns)} patterns')

print('-================-')
count += 1
logger.info(f'Returning {len(patterns)} patterns, ignore {count} patterns without var urls')

return ai_http_uri_recognition_pb2.HttpUriRecognitionResponse(patterns=patterns, version=version)

Expand All @@ -78,7 +79,7 @@ async def feedRawData(self, request, context):

There will always be a User service, its in topology, but it will not call fetchAllPatterns
"""
print(f'> Received feedRawData request for service {request.service}')
logger.info(f'> Received feedRawData request for service {request.service}')
if request.service == 'User':
# It should not be called
return Empty()
Expand All @@ -89,7 +90,8 @@ async def feedRawData(self, request, context):
# This is an experimental mechanism to avoid identifying non-restful uris unnecessarily.
self.known_services[service] += len(set(uris))
if self.known_services[service] < self.conf.drain_analysis_min_url_count:
print(f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping')
logger.info(
f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping')
return Empty()
self.uri_main_queue.put((uris, service))
return Empty()
Expand All @@ -99,13 +101,14 @@ async def serve(uri_main_queue, shared_results_object, conf):
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))

ai_http_uri_recognition_pb2_grpc.add_HttpUriRecognitionServiceServicer_to_server(
HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object, conf=conf), server)
HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object,
conf=conf), server)

server.add_insecure_port('[::]:17128') # TODO: change to config injection

await server.start()

print('Server started!')
logger.info('Server started at :17128!')

await server.wait_for_termination() # timeout=5

Expand All @@ -118,14 +121,15 @@ def run_server(uri_main_queue, shared_results_object, conf):
sys.exit(loop.run_until_complete(serve(uri_main_queue, shared_results_object, conf)))
except KeyboardInterrupt:
# Optionally show a message if the shutdown may take a while
print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
logger.info("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)

quit()
# TODO Handle interrupt and gracefully shutdown
"""
Learn from this
https://stackoverflow.com/questions/30765606/whats-the-correct-way-to-clean-up-after-an-interrupted-event-loop
"""

# Do not show `asyncio.CancelledError` exceptions during shutdown
# (a lot of these may be generated, skip this if you prefer to see them)
def shutdown_exception_handler(loop, context):
Expand Down
20 changes: 11 additions & 9 deletions servers/simple/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logger
import queue
import time
from collections import defaultdict

from models.uri_drain.persistence_handler import ServiceFilePersistenceHandler
from models.uri_drain.template_miner import TemplateMiner

logger = logger.init_logger(name=__name__)


def create_defaultdict_with_key(factory):
class CustomDefaultDict(defaultdict):
Expand All @@ -31,31 +34,30 @@ def __missing__(self, key):

def run_worker(uri_main_queue, shared_results_object, config, existing_miners):
drain_instances = create_defaultdict_with_key(lambda key: # URIDrain instances
TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, key) if config.snapshot_file_dir else None, config))
TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir,
key) if config.snapshot_file_dir else None,
config))
for service in existing_miners:
drain_instances[service] = existing_miners[service]

counter = 0
while True:
try:
uri_package = uri_main_queue.get()
print('====================')
print(f'currently have drain instances for {len(drain_instances)} services')
print(f'drain_instances.keys() = {drain_instances.keys()}')
print('-================-')
logger.info(
f'currently have drain instances for {len(drain_instances)} services, drain_instances.keys() = {drain_instances.keys()}, '
f'got uri package of length {len(uri_package[0])} for service <{uri_package[1]}>')
uris, service = uri_package[0], uri_package[1]
# print(uri_main_queue.get(timeout=1))
print(f'Got uri package of length {len(uri_package[0])} for service <{uri_package[1]}>')
start_time = time.time()
for uri in uris:
drain_instances[service].add_log_message(uri)
print(f'Processed {len(uris)} uris in {time.time() - start_time} seconds')
logger.info(f'Processed {len(uris)} uris of service {service} in {time.time() - start_time} seconds')
patterns = drain_instances[service].drain.cluster_patterns
shared_results_object.set_dict_field(service=service, value=patterns) # TODO add version
# increment here
counter += 1
print('-================-')
except Exception as e:
print(f"catch an unexpected error occurred: {e}")
logger.error(f"catch an unexpected error occurred: {e}")
except queue.Empty: # TODO Consider queue full
pass
Loading