Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance tracing #204

Merged
merged 28 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec86075
Refactor tracing with smaller structures.
ehpor May 20, 2024
a651235
Add protobuf conversion for all trace events.
ehpor May 21, 2024
a5507bc
Add Python bindings and convenience functions for tracing proxy.
ehpor May 21, 2024
93a7cc4
Temporarily disable printing of log messages.
ehpor May 21, 2024
01416e7
Start trace event distributor to testbed.
ehpor May 21, 2024
543e740
Add temporary example for testing tracing.
ehpor May 21, 2024
d624b70
Extract and generalize the LogDistributor into a ZmqDistributor.
ehpor May 21, 2024
8c6c886
Enable connecting/disconnecting outside of the constructor/destructor.
ehpor May 21, 2024
2670b61
Don't reconnect to the same host/port combination.
ehpor May 21, 2024
c0a8527
Fix bindings for tracing proxy constructor change.
ehpor May 21, 2024
a637c1a
Refactor to enable global access to trace.
ehpor May 21, 2024
1edac93
Include process ID in counter traces.
ehpor May 21, 2024
37d2811
Rename tracing C++ file.
ehpor May 21, 2024
be1d665
Add process and thread name to traces.
ehpor May 22, 2024
b4f4d75
Add tracing for data streams.
ehpor May 22, 2024
d2b9caf
Add docstrings.
ehpor Jun 4, 2024
a327523
Remove unused import.
ehpor Aug 23, 2024
edb1dbe
Add actual trace test script.
ehpor Aug 23, 2024
d54dba6
Make sure that trace events are not logged before connected.
ehpor Aug 23, 2024
c2d8b29
Make sure the ZmqDistributor is running as expected after start is ca…
ehpor Aug 23, 2024
07bf4b0
Increase sleep times for windows slow-joiner problems.
ehpor Aug 23, 2024
56606bb
Use non-hardcoded ports for trace distribution.
ehpor Aug 23, 2024
a3dcb73
Fix hardcoded port.
ehpor Aug 29, 2024
b5c71d8
Make trace for SubmitFrame() and RequestNewFrame() instant.
ehpor Aug 29, 2024
51d700f
Add comments and docstrings.
ehpor Aug 30, 2024
d2fd020
Move SetProcessName() call up to avoid race condition.
ehpor Aug 30, 2024
95a273c
Use a manual variable for IsConnected for performance reasons.
ehpor Aug 30, 2024
310fa15
Make sure to set IsConnected to true after connecting.
ehpor Aug 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion catkit2/testbed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@
'CatkitLogHandler',
'TestbedProxy',
'ServiceProxy',
'Experiment'
'Experiment',
'TraceWriter',
'trace_interval',
'trace_instant',
'trace_counter',
'ZmqDistributor',
]

from .testbed import *
from .experiment import *
from .service import *
from .logging import *
from .tracing import *
from .distributor import *
from .testbed_proxy import *
from .service_proxy import *
86 changes: 86 additions & 0 deletions catkit2/testbed/distributor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import zmq
import threading
import traceback


class ZmqDistributor:
'''Collects messages on a port and re-publish them on another.

This operates on a separate thread after it is started.

Parameters
----------
context : zmq.Context
A previously-created ZMQ context. All sockets will be created on this context.
input_port : integer
The port number for the incoming log messages.
output_port : integer
The port number for the outgoing log messages.
callback : function
A callback to call with each message.
'''
def __init__(self, context, input_port, output_port, callback=None):
self.context = context
self.input_port = input_port
self.output_port = output_port
self.callback = callback

self.shutdown_flag = threading.Event()
self.thread = None

self.is_running = threading.Event()

def start(self):
'''Start the proxy thread.
'''
self.thread = threading.Thread(target=self._forwarder)
self.thread.start()

self.is_running.wait()

def stop(self):
'''Stop the proxy thread.

This function waits until the thread is actually stopped.
'''
self.shutdown_flag.set()

if self.thread:
self.thread.join()

self.is_running.clear()

def _forwarder(self):
'''Create sockets and republish all received log messages.

.. note::
This function should not be called directly. Use
:func:`~catkit2.testbed.ZmqDistributor.start()` to start the proxy.
'''
collector = self.context.socket(zmq.PULL)
collector.RCVTIMEO = 50
collector.bind(f'tcp://*:{self.input_port}')

publicist = self.context.socket(zmq.PUB)
publicist.bind(f'tcp://*:{self.output_port}')

self.is_running.set()

while not self.shutdown_flag.is_set():
try:
try:
log_message = collector.recv_multipart()
publicist.send_multipart(log_message)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# Timed out.
continue
else:
raise RuntimeError('Error during receive') from e

if self.callback:
self.callback(log_message)
except Exception:
# Something went wrong during handling of the log message.
# Let's ignore this error, but still print the exception.
print(traceback.format_exc())
74 changes: 0 additions & 74 deletions catkit2/testbed/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import zmq
import json
import contextlib
import traceback
from colorama import Fore, Back, Style

from ..catkit_bindings import submit_log_entry, Severity
Expand All @@ -27,79 +26,6 @@ def emit(self, record):

submit_log_entry(filename, line, function, severity, message)

class LogDistributor:
'''Collects log messages on a port and re-publish them on another.

This operates on a separate thread after it is started.

Parameters
----------
context : zmq.Context
A previously-created ZMQ context. All sockets will be created on this context.
input_port : integer
The port number for the incoming log messages.
output_port : integer
The port number for the outgoing log messages.
'''
def __init__(self, context, input_port, output_port):
self.context = context
self.input_port = input_port
self.output_port = output_port

self.shutdown_flag = threading.Event()
self.thread = None

def start(self):
'''Start the proxy thread.
'''
self.thread = threading.Thread(target=self.forwarder)
self.thread.start()

def stop(self):
'''Stop the proxy thread.

This function waits until the thread is actually stopped.
'''
self.shutdown_flag.set()

if self.thread:
self.thread.join()

def forwarder(self):
'''Create sockets and republish all received log messages.

.. note::
This function should not be called directly. Use
:func:`~catkit2.testbed.LoggingProxy.start` to start the proxy.
'''
collector = self.context.socket(zmq.PULL)
collector.RCVTIMEO = 50
collector.bind(f'tcp://*:{self.input_port}')

publicist = self.context.socket(zmq.PUB)
publicist.bind(f'tcp://*:{self.output_port}')

while not self.shutdown_flag.is_set():
try:
try:
log_message = collector.recv_multipart()
publicist.send_multipart(log_message)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# Timed out.
continue
else:
raise RuntimeError('Error during receive') from e

log_message = log_message[0].decode('utf-8')
log_message = json.loads(log_message)

print(f'[{log_message["service_id"]}] {log_message["message"]}')
except Exception:
# Something went wrong during handling of the log message.
# Let's ignore this error, but still print the exception.
print(traceback.format_exc())

class LogObserver:
def __init__(self, host, port):
self.context = zmq.Context()
Expand Down
32 changes: 30 additions & 2 deletions catkit2/testbed/testbed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ..catkit_bindings import LogForwarder, Server, ServiceState, DataStream, get_timestamp, is_alive_state, Client
from .logging import *
from .distributor import ZmqDistributor

from ..proto import testbed_pb2 as testbed_proto
from ..proto import service_pb2 as service_proto
Expand Down Expand Up @@ -167,11 +168,13 @@ def __init__(self, port, is_simulated, config):
self.config = config

self.services = {}
self.launched_processes = []

self.log_distributor = None
self.log_handler = None
self.log_forwarder = None
self.launched_processes = []

self.tracing_distributor = None

self.log = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,6 +274,9 @@ def run(self):
self.start_log_distributor()
self.setup_logging()

# Start tracing distributor.
self.start_tracing_distributor()

heartbeat_thread = threading.Thread(target=self.do_heartbeats)
heartbeat_thread.start()

Expand Down Expand Up @@ -316,6 +322,9 @@ def run(self):
# Shut down the server.
self.server.stop()

# Stop tracing distributor.
self.stop_tracing_distributor()

# Stop the logging.
self.destroy_logging()
self.stop_log_distributor()
Expand Down Expand Up @@ -392,9 +401,15 @@ def destroy_logging(self):
def start_log_distributor(self):
'''Start the log distributor.
'''
def callback(log_message):
log_message = log_message[0].decode('utf-8')
log_message = json.loads(log_message)

print(f'[{log_message["service_id"]}] {log_message["message"]}')

self.logging_ingress_port, self.logging_egress_port = get_unused_port(num_ports=2)

self.log_distributor = LogDistributor(self.context, self.logging_ingress_port, self.logging_egress_port)
self.log_distributor = ZmqDistributor(self.context, self.logging_ingress_port, self.logging_egress_port, callback)
self.log_distributor.start()

def stop_log_distributor(self):
Expand All @@ -404,6 +419,19 @@ def stop_log_distributor(self):
self.log_distributor.stop()
self.log_distributor = None

def start_tracing_distributor(self):
'''Start the tracing distributor.
'''
self.tracing_ingress_port, self.tracing_egress_port = get_unused_port(num_ports=2)

self.tracing_distributor = ZmqDistributor(self.context, self.tracing_ingress_port, self.tracing_egress_port)
self.tracing_distributor.start()

def stop_tracing_distributor(self):
if self.tracing_distributor:
self.tracing_distributor.stop()
self.tracing_distributor = None

def on_start_service(self, data):
request = testbed_proto.StartServiceRequest()
request.ParseFromString(data)
Expand Down
Loading
Loading