Skip to content

Commit

Permalink
Hooked up cactus rt with tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jul 29, 2023
1 parent 2bab8cf commit 3f15fe5
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 66 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
build/
.cache/

data.perfetto
.vscode/settings.json
6 changes: 6 additions & 0 deletions examples/tracing_protos_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Tracing protos example
======================

This is just a sample code showing that it is possible to generate
[Perfetto](https://perfetto.dev/) events without using the Perfetto library. The
code here follows [this tutorial](https://perfetto.dev/docs/reference/synthetic-track-event).
27 changes: 23 additions & 4 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "config.h"
#include "quill/Quill.h"
#include "thread.h"
#include "tracing/trace_aggregator.h"

namespace cactus_rt {

Expand All @@ -17,13 +18,17 @@ class App {
// The name of the app
const char* name_;

// Size of heap to reserve in bytes at program startup.
size_t heap_size_;

// Configuration for quill logging
quill::Config logger_config_;

// Size of heap to reserve in bytes at program startup.
size_t heap_size_;
TracerConfig tracer_config_;

std::vector<std::shared_ptr<Thread>> threads_;

std::vector<std::shared_ptr<BaseThread>> threads_;
tracing::TraceAggregator trace_aggregator_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand Down Expand Up @@ -56,14 +61,21 @@ class App {
*
* @param thread A shared ptr to a thread.
*/
void RegisterThread(std::shared_ptr<BaseThread> thread);
void RegisterThread(std::shared_ptr<Thread> thread);

/**
* @brief Starts the app by locking the memory and reserving the memory. Also
* start all the threads in registration order.
*/
virtual void Start();

/**
* @brief Request stops for threads started by the cactus rt framework
*
* This is useful if you need a custom thread shutdown order.
*/
void RequestStopForSystemThreads();

/**
* @brief sends RequestStop to all threads in registration order.
*
Expand All @@ -72,6 +84,13 @@ class App {
*/
virtual void RequestStop();

/**
* @brief Only join the threads started by the cactus rt framework
*
* This is useful if you need a custom thread shutdown order.
*/
void JoinSystemThreads();

/**
* @brief Joins all the threads in registration order.
*
Expand Down
51 changes: 43 additions & 8 deletions include/cactus_rt/config.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,53 @@
#ifndef CACTUS_RT_CONFIG_H_
#define CACTUS_RT_CONFIG_H_

#include <variant>
#include <quill/Quill.h>

#include "quill/Quill.h"
#include <string>
#include <variant>
#include <vector>

namespace cactus_rt {

/**
* @brief Config for the tracer
*/
struct TracerConfig {
// TODO: custom sink support to skip the output file
/**
* @brief The output filename for the trace packet
*/
std::string trace_output_filename = "data.perfetto";

/**
* @brief The CPU configuration for the tracer's background processing thread
*/
std::vector<size_t> trace_aggregator_cpu_affinity;
};

/**
* @brief The configuration required for an App
*/
struct AppConfig {
// The name of the app
char* name;
/**
* @brief The name of the app
*/
const char* name = "RTApp";

/**
* @brief Size of heap to reserve in bytes at program startup.
*/
size_t heap_size = 0;

// Configuration for quill logging
/**
* @brief The configuration for quill logging
*/
quill::Config logger_config;

// Size of heap to reserve in bytes at program startup.
size_t heap_size = 0;
/**
* @brief The config for the tracer if enabled (ENABLE_TRACING option in cmake)
*/
TracerConfig tracer_config;
};

/**
Expand All @@ -44,12 +73,16 @@ struct DeadlineThreadConfig {
uint64_t sched_period_ns;
};

struct ThreadTracerConfig {
uint32_t queue_size = 16384;
};

/**
* @brief The configuration required for a thread
*/
struct ThreadConfig {
// The name of the thread
std::string name;
const char* name = "Thread";

// A vector of CPUs this thread should run on. If empty, no CPU restrictions are set.
std::vector<size_t> cpu_affinity = {};
Expand All @@ -59,6 +92,8 @@ struct ThreadConfig {

// The configuration for the scheduler (SCHED_OTHER, SCHED_FIFO, or SCHED_DEADLINE)
std::variant<OtherThreadConfig, FifoThreadConfig, DeadlineThreadConfig> scheduler_config;

ThreadTracerConfig tracer_config;
};

struct CyclicThreadConfig : ThreadConfig {
Expand Down
33 changes: 24 additions & 9 deletions include/cactus_rt/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "config.h"
#include "quill/Quill.h"
#include "tracing/thread_tracer.h"
#include "tracing/trace_aggregator.h"

namespace cactus_rt {

Expand Down Expand Up @@ -94,7 +96,12 @@ class Thread : public BaseThread {

SchedulerConfigVariant scheduler_config_;

quill::Logger* logger_;
quill::Logger* logger_;
std::shared_ptr<tracing::ThreadTracer> tracer_;

// Non-owning trace aggregator pointer. Used only for notifying that
// the thread has started.
tracing::TraceAggregator* trace_aggregator_ = nullptr;

pthread_t thread_;
int64_t start_monotonic_time_ns_ = 0;
Expand All @@ -116,16 +123,15 @@ class Thread : public BaseThread {
cpu_affinity_(config.cpu_affinity),
stack_size_(static_cast<size_t>(PTHREAD_STACK_MIN) + config.stack_size),
scheduler_config_(config.scheduler_config),
logger_(quill::create_logger(name_)) {}
logger_(quill::create_logger(name_)),
tracer_(std::make_shared<tracing::ThreadTracer>(config.name, config.tracer_config.queue_size)) {}

/**
* Returns the name of the thread
*
* @returns The name of the thread.
*/
inline const std::string& Name() override {
return name_;
}
inline const std::string& Name() override { return name_; }

/**
* Starts the thread in the background.
Expand All @@ -141,12 +147,21 @@ class Thread : public BaseThread {
*/
int Join() override;

/**
* @brief Sets the trace_aggregator_ pointer so the thread can notify the
* trace_aggregator_ when it starts. This should only be called by App.
*
* @private
*/
inline void SetTraceAggregator(tracing::TraceAggregator* trace_aggregator) {
trace_aggregator_ = trace_aggregator;
}

protected:
inline quill::Logger* Logger() const { return logger_; }
inline SchedulerConfigVariant SchedulerConfig() {
return scheduler_config_;
}
inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; }
inline SchedulerConfigVariant SchedulerConfig() { return scheduler_config_; }
inline tracing::ThreadTracer& Tracer() { return *tracer_; }
inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; }

/**
* Override this method to do work. If this is a real-time thread, once this
Expand Down
17 changes: 10 additions & 7 deletions include/cactus_rt/tracing/thread_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,16 @@ class ThreadTracer {
TraceSpan WithSpan(const char* name, const char* category = nullptr) noexcept;
bool InstantEvent(const char* name, const char* category = nullptr) noexcept;

inline EventCountData EventCount() const noexcept {
return event_count_.Read();
}

inline size_t QueueCapacity() const noexcept {
return queue_capacity_;
}
inline EventCountData EventCount() const noexcept { return event_count_.Read(); }
inline size_t QueueCapacity() const noexcept { return queue_capacity_; }

/**
* @brief Get and memorize the thread id. Should only be called from the
* thread by Thread::RunThread.
*
* @private
*/
void SetTid() noexcept { tid_ = gettid(); }

private:
template <typename... Args>
Expand Down
13 changes: 5 additions & 8 deletions include/cactus_rt/tracing/trace_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
#include "thread_tracer.h"

namespace cactus_rt::tracing {
struct TraceAggregatorConfig {
const char* process_name;
std::vector<size_t> cpu_affinity;
};

class TraceAggregator {
using Trace = cactus_tracing::vendor::perfetto::protos::Trace;

TraceAggregatorConfig config_;
uint64_t process_track_uuid_;
quill::Logger* logger_;
const char* process_name_;
std::vector<size_t> cpu_affinity_;
uint64_t process_track_uuid_;
quill::Logger* logger_;

// We use a std::thread and not a cactus_rt::Thread as cactus_rt::Thread has
// dependency on this class, so we cannot have a circular dependency.
Expand Down Expand Up @@ -53,7 +50,7 @@ class TraceAggregator {
std::deque<Trace> sticky_trace_packets_;

public:
explicit TraceAggregator(TraceAggregatorConfig config);
explicit TraceAggregator(const char* name, std::vector<size_t> cpu_affinity);

// No copy no move
TraceAggregator(const TraceAggregator&) = delete;
Expand Down
4 changes: 2 additions & 2 deletions include/cactus_rt/tracing/tracing_enabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
namespace cactus_rt::tracing {
extern std::atomic_bool tracing_enabled;

void EnableTracing() noexcept {
inline void EnableTracing() noexcept {
tracing_enabled.store(true, std::memory_order_relaxed);
}

void DisableTracing() noexcept {
inline void DisableTracing() noexcept {
tracing_enabled.store(false, std::memory_order_relaxed);
}

Expand Down
32 changes: 30 additions & 2 deletions src/cactus_rt/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,30 @@
#include "cactus_rt/utils.h"
#include "quill/Quill.h"

using FileSink = cactus_rt::tracing::FileSink;

namespace cactus_rt {

void App::RegisterThread(std::shared_ptr<BaseThread> thread) {
void App::RegisterThread(std::shared_ptr<Thread> thread) {
thread->SetTraceAggregator(&trace_aggregator_);
threads_.push_back(thread);
}

App::App(AppConfig config)
: name_(config.name), logger_config_(config.logger_config), heap_size_(config.heap_size) {
: name_(config.name),
heap_size_(config.heap_size),
logger_config_(config.logger_config),
tracer_config_(config.tracer_config),
trace_aggregator_(name_, config.tracer_config.trace_aggregator_cpu_affinity) {
if (logger_config_.default_handlers.empty()) {
SetDefaultLogFormat(logger_config_);
}

// TODO: backend_thread_notification_handler can throw - we need to handle this somehow
// logger_config_.backend_thread_notification_handler

// TODO: make the sinks more flexible
trace_aggregator_.RegisterSink(std::make_unique<FileSink>(config.tracer_config.trace_output_filename.c_str()));
}

void App::Start() {
Expand All @@ -34,18 +44,36 @@ void App::Start() {
for (auto& thread : threads_) {
thread->Start(start_monotonic_time_ns);
}

// Start trace aggregator second as the trace aggregator begins to take a
// lock, while thread startup involves taking the same lock to register the
// thread tracer. This may make startup faster, but the statement is not
// based on measured facts but just hypothesis.
trace_aggregator_.Start();
}

void App::RequestStop() {
for (auto& thread : threads_) {
thread->RequestStop();
}

RequestStopForSystemThreads();
}

void App::RequestStopForSystemThreads() {
trace_aggregator_.RequestStop();
}

void App::Join() {
for (auto& thread : threads_) {
thread->Join();
}

JoinSystemThreads();
}

void App::JoinSystemThreads() {
trace_aggregator_.Join();
}

void App::LockMemory() const {
Expand Down
7 changes: 5 additions & 2 deletions src/cactus_rt/cyclic_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ void CyclicThread::Run() noexcept {

TraceLoopStart();

if (Loop(loop_start - Thread::StartMonotonicTimeNs())) {
break;
{
auto span = Tracer().WithSpan("CyclicThread::Loop", "cactusrt");
if (Loop(loop_start - Thread::StartMonotonicTimeNs())) {
break;
}
}

TraceLoopEnd();
Expand Down
Loading

0 comments on commit 3f15fe5

Please sign in to comment.