Skip to content

Commit

Permalink
Merge branch 'main' into fix-proto-version
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Sep 1, 2024
2 parents 3903f83 + db7bf13 commit 13432bd
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 32 deletions.
2 changes: 1 addition & 1 deletion logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
def get_logger(name=None): # noqa
_logger = logging.getLogger(name)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(name)s [%(levelname)s] %(message)s')
formatter = logging.Formatter('%(asctime)s - %(name)s - Thread-%(process)d - [%(levelname)s] %(message)s')
ch.setFormatter(formatter)
_logger.addHandler(ch)
_logger.propagate = False
Expand Down
8 changes: 8 additions & 0 deletions models/uri_drain/persistence_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ def save_state(self, state):
def load_state(self):
pass

@abstractmethod
def get_service(self):
pass


class ServiceFilePersistenceHandler(PersistenceHandler):

def __init__(self, base_dir, service):
self.service_name = service
self.file_path = os.path.join(base_dir, 'services', base64.b64encode(service.encode('utf-8')).decode('utf-8'))
path = Path(self.file_path)
path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -32,6 +37,9 @@ def load_state(self):
with open(self.file_path, 'rb') as file:
return file.read()

def get_service(self):
return self.service_name


class ServicePersistentLoader:

Expand Down
15 changes: 9 additions & 6 deletions models/uri_drain/template_miner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT

import base64
import logging
import logger
import re
import time
import zlib
Expand All @@ -19,7 +19,7 @@
from models.uri_drain.uri_drain import Drain, LogCluster
from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler

logger = logging.getLogger(__name__)
logger = logger.init_logger(name=__name__)

config_filename = 'drain3.ini'

Expand All @@ -32,7 +32,7 @@ def load_existing_miners(config: TemplateMinerConfig = None):
existing_services = ServicePersistentLoader(config.snapshot_file_dir).load_services()
miners = defaultdict(TemplateMiner)
if len(existing_services) > 0:
print(f'Detected {len(existing_services)} services from disk')
logger.info(f'Detected {len(existing_services)} services from disk')
for service in existing_services:
miners[service] = TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, service), config)
return miners
Expand All @@ -48,7 +48,10 @@ def __init__(self,
:param persistence_handler: The type of persistence to use. When None, no persistence is applied.
:param config: Configuration object. When none, configuration is loaded from default .ini file (if exist)
"""
logger.info("Starting Drain3 template miner")
service = ""
if persistence_handler is not None:
service = persistence_handler.get_service()
logger.info(f"Starting Drain3 template miner of service {service}")

if config is None:
logger.info(f"Loading configuration from {config_filename}")
Expand Down Expand Up @@ -99,11 +102,11 @@ def __init__(self,
self.load_state()

def load_state(self):
logger.info("Checking for saved state")
logger.info(f"Checking for saved state of service {self.persistence_handler.get_service()}")

state = self.persistence_handler.load_state()
if state is None or state == b'':
logger.info("Saved state not found")
logger.info(f"Saved state not found of service {self.persistence_handler.get_service()}")
return

if self.config.snapshot_compress_state:
Expand Down
10 changes: 7 additions & 3 deletions servers/simple/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
import multiprocessing
import os
from os.path import dirname
import logger

from models.uri_drain.template_miner import load_existing_miners
from models.uri_drain.template_miner_config import TemplateMinerConfig
from servers.simple.results_manager import ProxyURIDrainResultsManager, URIDrainResults
from servers.simple.server import run_server
from servers.simple.worker import run_worker

logger = logger.init_logger(logging_level='INFO', name=__name__)


def run():
print('Starting server from entrypoint...')
logger.info('Starting server from entrypoint...')
ProxyURIDrainResultsManager.register("URIDrainResults", URIDrainResults)

manager = ProxyURIDrainResultsManager()
Expand All @@ -32,7 +35,7 @@ def run():
# Load config
config = TemplateMinerConfig()
config_file = os.path.join(dirname(__file__), "uri_drain.ini")
print(f'Searching for config file at {config_file}')
logger.info(f'Searching for config file at {config_file}')
config.load(config_filename=config_file) # change to config injection from env or other

# SET DEBUG HERE! < TODO CONFIG FILE
Expand All @@ -45,7 +48,8 @@ def run():
shared_results_object.set_dict_field(service=service, value=miners[service].drain.cluster_patterns)

producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object, config))
consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object, config, miners))
consumer_process = multiprocessing.Process(target=run_worker,
args=(uri_main_queue, shared_results_object, config, miners))

producer_process.start()
consumer_process.start()
Expand Down
30 changes: 17 additions & 13 deletions servers/simple/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
from concurrent import futures

import grpc
import logger
from google.protobuf.empty_pb2 import Empty

from servers.protos.generated import ai_http_uri_recognition_pb2
from servers.protos.generated import ai_http_uri_recognition_pb2_grpc
from servers.protos.generated.ai_http_uri_recognition_pb2 import Pattern

logger = logger.init_logger(name=__name__)


class HttpUriRecognitionServicer(ai_http_uri_recognition_pb2_grpc.HttpUriRecognitionServiceServicer):
"""
Expand All @@ -42,8 +45,7 @@ def __init__(self, uri_main_queue, shared_results_object, conf):
async def fetchAllPatterns(self, request, context):
# TODO OAP SIDE OR THIS SIDE must save the version, e.g. oap should check if version is > got version, since
# this is a stateful service and it may crash and restart
print('-================-')
print(
logger.info(
f'> Received fetchAllPatterns request for service <{request.service}>, '
f'oap side version is: {request.version}')

Expand All @@ -54,21 +56,20 @@ async def fetchAllPatterns(self, request, context):
# https://github.com/apache/skywalking/blob/master/oap-server/ai-pipeline/src/main/java/org
# /apache/skywalking/oap/server/ai/pipeline/services/HttpUriRecognitionService.java#LL39C32-L39C32
if version == request.version: # Initial version is NULL
print('Version match, returning empty response')
logger.info('Version match, returning empty response')
return ai_http_uri_recognition_pb2.HttpUriRecognitionResponse(patterns=[], version=version)

print(f'Version do not match, local:{version} vs oap:{request.version}')
logger.info(f'Version do not match, local:{version} vs oap:{request.version}')

cluster_candidates = self.shared_results_object.get_dict_field(request.service)
patterns = []
count = 0
for cluster in cluster_candidates:
if '{var}' in cluster:
patterns.append(Pattern(pattern=cluster))
else: # TODO this is for post processing feature to be added
print("Skipping pattern without {var}, OAP won't need this")
print(f'Returning {len(patterns)} patterns')

print('-================-')
count += 1
logger.info(f'Returning {len(patterns)} patterns, ignore {count} patterns without var urls')

return ai_http_uri_recognition_pb2.HttpUriRecognitionResponse(patterns=patterns, version=version)

Expand All @@ -78,7 +79,7 @@ async def feedRawData(self, request, context):
There will always be a User service, its in topology, but it will not call fetchAllPatterns
"""
print(f'> Received feedRawData request for service {request.service}')
logger.info(f'> Received feedRawData request for service {request.service}')
if request.service == 'User':
# It should not be called
return Empty()
Expand All @@ -89,7 +90,8 @@ async def feedRawData(self, request, context):
# This is an experimental mechanism to avoid identifying non-restful uris unnecessarily.
self.known_services[service] += len(set(uris))
if self.known_services[service] < self.conf.drain_analysis_min_url_count:
print(f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping')
logger.info(
f'Unique Uri count too low({self.known_services[service]} < {self.conf.drain_analysis_min_url_count}) for service {service}, skipping')
return Empty()
self.uri_main_queue.put((uris, service))
return Empty()
Expand All @@ -99,13 +101,14 @@ async def serve(uri_main_queue, shared_results_object, conf):
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))

ai_http_uri_recognition_pb2_grpc.add_HttpUriRecognitionServiceServicer_to_server(
HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object, conf=conf), server)
HttpUriRecognitionServicer(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object,
conf=conf), server)

server.add_insecure_port('[::]:17128') # TODO: change to config injection

await server.start()

print('Server started!')
logger.info('Server started at :17128!')

await server.wait_for_termination() # timeout=5

Expand All @@ -118,14 +121,15 @@ def run_server(uri_main_queue, shared_results_object, conf):
sys.exit(loop.run_until_complete(serve(uri_main_queue, shared_results_object, conf)))
except KeyboardInterrupt:
# Optionally show a message if the shutdown may take a while
print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
logger.info("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)

quit()
# TODO Handle interrupt and gracefully shutdown
"""
Learn from this
https://stackoverflow.com/questions/30765606/whats-the-correct-way-to-clean-up-after-an-interrupted-event-loop
"""

# Do not show `asyncio.CancelledError` exceptions during shutdown
# (a lot of these may be generated, skip this if you prefer to see them)
def shutdown_exception_handler(loop, context):
Expand Down
20 changes: 11 additions & 9 deletions servers/simple/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logger
import queue
import time
from collections import defaultdict

from models.uri_drain.persistence_handler import ServiceFilePersistenceHandler
from models.uri_drain.template_miner import TemplateMiner

logger = logger.init_logger(name=__name__)


def create_defaultdict_with_key(factory):
class CustomDefaultDict(defaultdict):
Expand All @@ -31,31 +34,30 @@ def __missing__(self, key):

def run_worker(uri_main_queue, shared_results_object, config, existing_miners):
drain_instances = create_defaultdict_with_key(lambda key: # URIDrain instances
TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, key) if config.snapshot_file_dir else None, config))
TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir,
key) if config.snapshot_file_dir else None,
config))
for service in existing_miners:
drain_instances[service] = existing_miners[service]

counter = 0
while True:
try:
uri_package = uri_main_queue.get()
print('====================')
print(f'currently have drain instances for {len(drain_instances)} services')
print(f'drain_instances.keys() = {drain_instances.keys()}')
print('-================-')
logger.info(
f'currently have drain instances for {len(drain_instances)} services, drain_instances.keys() = {drain_instances.keys()}, '
f'got uri package of length {len(uri_package[0])} for service <{uri_package[1]}>')
uris, service = uri_package[0], uri_package[1]
# print(uri_main_queue.get(timeout=1))
print(f'Got uri package of length {len(uri_package[0])} for service <{uri_package[1]}>')
start_time = time.time()
for uri in uris:
drain_instances[service].add_log_message(uri)
print(f'Processed {len(uris)} uris in {time.time() - start_time} seconds')
logger.info(f'Processed {len(uris)} uris of service {service} in {time.time() - start_time} seconds')
patterns = drain_instances[service].drain.cluster_patterns
shared_results_object.set_dict_field(service=service, value=patterns) # TODO add version
# increment here
counter += 1
print('-================-')
except Exception as e:
print(f"catch an unexpected error occurred: {e}")
logger.error(f"catch an unexpected error occurred: {e}")
except queue.Empty: # TODO Consider queue full
pass

0 comments on commit 13432bd

Please sign in to comment.