Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp logging with RootLogHandler and new logger config #67

Merged
merged 9 commits into from
Sep 11, 2024
13 changes: 2 additions & 11 deletions examples/simple_worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import os

from frinx.client.v2.frinx_conductor_wrapper import FrinxConductorWrapper
from frinx.common.logging import logging_common
from frinx.common.logging.logging_common import LoggerConfig
from frinx.common.logging.logging_common import Root
from frinx.common.logging.config import LoggerConfig


def register_tasks(conductor_client: FrinxConductorWrapper) -> None:
Expand All @@ -21,14 +19,7 @@ def register_workflows() -> None:

def main() -> None:

logging_common.configure_logging(
LoggerConfig(
root=Root(
level=os.environ.get('LOG_LEVEL', 'INFO').upper(),
handlers=['console']
)
)
)
LoggerConfig().setup_logging()

from frinx.common.telemetry.metrics import Metrics
from frinx.common.telemetry.metrics import MetricsSettings
Expand Down
36 changes: 36 additions & 0 deletions examples/simple_worker/workers/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,39 @@ def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
bytes=len(text.encode('utf-8'))
)
)

class Logs(WorkerImpl):
class WorkerDefinition(TaskDefinition):
name: str = 'TEST_logs'
description: str = 'testing purposes: logging'
labels: ListAny = ['TEST']
timeout_seconds: int = 60
response_timeout_seconds: int = 60

class WorkerInput(TaskInput):
...

class WorkerOutput(TaskOutput):
...

def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
import logging

from frinx.common.logging.root_logger import root_logger

module_logger = logging.getLogger(__name__)

module_logger.info('This is an INFO message from module_logger')
module_logger.debug('This is a DEBUG message from module_logger')
module_logger.warning('This is a WARNING message from module_logger')
module_logger.error('This is an ERROR message from module_logger')

root_logger.info('This is an INFO message from the root_logger')
root_logger.debug('This is a DEBUG message from the root_logger')
root_logger.warning('This is a WARNING message from the root_logger')
root_logger.error('This is an ERROR message from the root_logger')

return TaskResult(
status=TaskResultStatus.COMPLETED,
logs=['This is a log message from TaskResult.']
)
7 changes: 7 additions & 0 deletions examples/simple_worker/workers/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class WorkflowOutput(WorkflowImpl.WorkflowOutput):
bytes: str

def workflow_builder(self, workflow_inputs: WorkflowInput) -> None:
generate_task = SimpleTask(
name=TestWorkers.Logs,
task_reference_name='logs',
input_parameters=SimpleTaskInputParameters(dict())
)
self.tasks.append(generate_task)

generate_task = SimpleTask(
name=TestWorkers.LoremIpsum,
task_reference_name='generate',
Expand Down
3 changes: 3 additions & 0 deletions frinx/client/v2/frinx_conductor_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from frinx.client.v2.conductor import WFClientMgr
from frinx.common.frinx_rest import CONDUCTOR_URL_BASE
from frinx.common.logging.root_logger import root_log_handler
from frinx.common.worker.worker import WorkerImpl

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -239,6 +240,7 @@ def register(self, task_blueprint: WorkerImpl) -> None:
def execute(self, task: RawTaskIO, task_blueprint: WorkerImpl) -> None:
try:
logger.info('Executing a task %s', task['taskId'])
root_log_handler.set_task_info_for_thread(str(task['taskType']), str(task['workflowInstanceId']))
resp = task_blueprint.execute_wrapper(task)

if resp is None:
Expand All @@ -248,6 +250,7 @@ def execute(self, task: RawTaskIO, task_blueprint: WorkerImpl) -> None:
task['status'] = resp['status']
task['outputData'] = resp.get('output', {})
task['logs'] = resp.get('logs', [])
task['logs'].extend(root_log_handler.get_logs())

logger.debug('Executing a task %s, response: %s', task['taskId'], resp)
logger.debug('Executing a task %s, task body: %s', task['taskId'], task)
Expand Down
7 changes: 5 additions & 2 deletions frinx/common/frinx_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@

# SET SERVICES URL VARIABLES
UNICONFIG_URL_BASE = os.getenv('UNICONFIG_URL_BASE', 'http://uniconfig:8181/rests')
CONDUCTOR_URL_BASE = os.getenv('CONDUCTOR_URL_BASE', 'http://workflow-proxy:8088/proxy/api')
CONDUCTOR_URL_BASE = os.getenv('CONDUCTOR_URL_BASE', 'http://conductor-server:8080/api')
INVENTORY_URL_BASE = os.getenv('INVENTORY_URL_BASE', 'http://inventory:8000/graphql')
INFLUXDB_URL_BASE = os.getenv('INFLUXDB_URL_BASE', 'http://influxdb:8086')
RESOURCE_MANAGER_URL_BASE = os.getenv('RESOURCE_MANAGER_URL_BASE', 'http://resource-manager:8884/query')
SCHELLAR_URL_BASE = os.getenv('SCHELLAR_URL_BASE', 'http://schellar:3000/query')
SCHELLAR_URL_BASE = os.getenv('SCHELLAR_URL_BASE', 'http://conductor-server:3000/query')
KRAKEND_URL_BASE = os.getenv('KRAKEND_URL_BASE', 'http://krakend:8080')
UNICONFIG_ZONE_URL_TEMPLATE = os.getenv('UNICONFIG_ZONE_URL_TEMPLATE', 'http://{uc}:8181/rests')

# URL HEADERS
UNICONFIG_HEADERS = MappingProxyType({'Content-Type': 'application/json'})
UNICONFIG_REQUEST_PARAMS: MappingProxyType[str, Any] = MappingProxyType({})

# SET SERVICES SETTINGS
UNICONFIG_KEY_DELIMITER = os.getenv('UNICONFIG_KEY_DELIMITER', '%22')

CONDUCTOR_HEADERS = MappingProxyType(
{
'Content-Type': 'application/json',
Expand Down
5 changes: 0 additions & 5 deletions frinx/common/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
from pathlib import Path

ROOT_DIR = Path(__file__).parent

LOGGING_CONFIG = Path(ROOT_DIR, 'logging-config.json')
58 changes: 58 additions & 0 deletions frinx/common/logging/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging.config

from frinx.common.logging.root_logger import root_log_handler
from frinx.common.logging.settings import LoggerSettings
from frinx.common.type_aliases import DictAny


class LoggerConfig:
"""
Configuration class for setting up logging.
"""
_setup_done = False
_logger_settings = LoggerSettings()

def __init__(self, level: str | None = None, handlers: list[str] | None = None):
self.level = level or self._logger_settings.DEFAULT_LOG_LEVEL
self.handlers = handlers or self._logger_settings.DEFAULT_HANDLERS

def setup_logging(self) -> None:
"""Set up logging configuration using dictConfig."""
if LoggerConfig._setup_done:
return # Prevent reconfiguration

logging.config.dictConfig(self.generate_logging_config())
logging.getLogger().addHandler(root_log_handler)
LoggerConfig._setup_done = True

def generate_logging_config(self) -> DictAny:
"""Generate the logging configuration dictionary."""
return {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'verbose_formatter': {
'format': self._logger_settings.LOG_FORMAT_VERBOSE,
'datefmt': '%Y-%m-%d %H:%M:%S',
},
'default_formatter': {
'format': self._logger_settings.LOG_FORMAT_DEFAULT,
'datefmt': '%Y-%m-%d %H:%M:%S',
},
},
'handlers': {
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': str(self._logger_settings.LOG_FILE_PATH),
'maxBytes': 10 * 1024 * 1024, # 10 MB
'backupCount': 10,
'level': self.level,
'formatter': 'verbose_formatter',
},
},
'root': {
'handlers': ['file'], # NOTE: The root_log_handler is attached automatically.
'level': self.level,
'propagate': False,
},
}
86 changes: 86 additions & 0 deletions frinx/common/logging/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging
import logging.config
import threading
from collections import deque

from frinx.common.logging.settings import LoggerSettings


class RootLogHandler(logging.Handler):
"""
A logging handler that manages log messages for both task-specific and general logging,
utilizing thread-specific queues for logs from tasks.

Attributes:
max_capacity (int): Maximum number of log messages to retain in the queue (for tasks).
max_message_length (int): Maximum length of each log message (for tasks).
thread_data (threading.local): Thread-local storage for log data (for tasks).
formatter (logging.Formatter): Formatter for internal log storage.
console_formatter (logging.Formatter): Formatter for console output.
console_handler (logging.StreamHandler): Handler for console output.
"""

_logger_settings = LoggerSettings()

def __init__(self, max_capacity: int = 100, max_message_length: int = 15000, level: int = logging.INFO) -> None:
super().__init__(level)
self._is_task: bool = False
self.max_capacity: int = max_capacity
self.max_message_length: int = max_message_length
self.thread_data = threading.local()
self.thread_data.log_queue = deque(maxlen=self.max_capacity)
self.console_formatter = logging.Formatter(self._logger_settings.LOG_FORMAT_DEFAULT, datefmt='%F %T')
self.console_handler = logging.StreamHandler()
self.console_handler.setFormatter(self.console_formatter)

def emit(self, record: logging.LogRecord) -> None:
"""
Process and emit a log record. Store the log in a thread-specific queue if the context is task-related.
"""

formatted_record: str = self.format(record)
truncated_record: str = self._truncate_message(formatted_record)

if self._is_task:
if not hasattr(self.thread_data, 'log_queue'):
self._setup_thread_logging()
self.thread_data.log_queue.append(truncated_record)
record.source = getattr(self.thread_data, 'source', record.name)
else:
record.source = record.name

self.console_handler.emit(record)

def _truncate_message(self, message: str) -> str:
"""Truncate a message if it exceeds the maximum message length."""
if len(message) > self.max_message_length:
return message[:self.max_message_length] + '... [truncated]'
return message

def _setup_thread_logging(self) -> None:
"""Setup thread-specific logging."""
self.thread_data.log_queue = deque(maxlen=self.max_capacity)

def set_task_info_for_thread(self, *args: str) -> None:
"""Set task-specific information for the current thread. """
self._is_task = True
delimiter: str = ' '
self.thread_data.source = delimiter.join(str(arg) for arg in args)

def get_logs(self, clear: bool = True) -> list[str]:
"""Retrieve and optionally clear the logs stored in the current thread's queue."""
if not hasattr(self.thread_data, 'log_queue'):
return []

logs: list[str] = list(self.thread_data.log_queue)

if clear:
self.thread_data.log_queue.clear()
self._clear_taskname_for_thread()

return logs

def _clear_taskname_for_thread(self) -> None:
"""Clear the task-specific information for the current thread."""
if hasattr(self.thread_data, 'task_name'):
del self.thread_data.task_name
37 changes: 0 additions & 37 deletions frinx/common/logging/logging-config.json

This file was deleted.

Loading
Loading