Skip to content

Commit

Permalink
Significantly refactored trace aggregator
Browse files Browse the repository at this point in the history
Previously we were leaking ThreadTracer objects in the TraceAggregator
as creating new threads means ThreadTracer gets pushed into
TraceAggregator but it is never removed. This causes a memory leak and
also makes the TraceAggregator slower.

This refactors the entire code to make this work. Also removed the
dependency of Thread on the App* pointer, which is unsafe and can cause
segfaults in some extreme situations.
  • Loading branch information
shuhaowu committed Jul 24, 2024
1 parent 2f1cb13 commit f7ded03
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 385 deletions.
3 changes: 2 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ cert-*,
-readability-identifier-length,
-readability-isolate-declaration,
-readability-magic-numbers,
-readability-redundant-inline-specifier'
-readability-redundant-inline-specifier,
-readability-use-anyofallof'
# TODO: Re-enable bugprone-exception-escape when no longer throwing
# https://github.com/isocpp/CppCoreGuidelines/issues/1589
WarningsAsErrors: '*'
Expand Down
8 changes: 2 additions & 6 deletions examples/tracing_example_no_rt/main.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <cactus_rt/tracing.h>

#include <list>
#include <memory>
#include <thread>

Expand All @@ -25,18 +24,15 @@ void StartTracing(const char* app_name, const char* filename) {

// Create the file sink so the data aggregated by the TraceAggregator will be written to somewhere.
auto file_sink = std::make_shared<FileSink>(filename);
trace_aggregator->RegisterSink(file_sink);

quill::start();
trace_aggregator->Start();
trace_aggregator->Start(file_sink);
}

void StopTracing() {
cactus_rt::tracing::DisableTracing();

trace_aggregator->RequestStop();
trace_aggregator->Join();
trace_aggregator = nullptr; // Destroy the trace aggregator and free all resources.
trace_aggregator->Stop();
}

int main() {
Expand Down
27 changes: 1 addition & 26 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include <gtest/gtest_prod.h>

#include <list>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -36,14 +35,7 @@ class App {

std::vector<std::shared_ptr<Thread>> threads_;

// We need to cache the list thread tracers here because the trace_aggregator
// can be dynamically created and stopped. When a new trace aggregator is
// created, it needs to know about all the thread tracers.
//
// TODO: investigate into a weak pointer.
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::mutex aggregator_mutex_;
std::shared_ptr<tracing::TraceAggregator> trace_aggregator_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand Down Expand Up @@ -117,11 +109,6 @@ class App {
*/
bool StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Register a custom trace sink after starting the trace session
*/
void RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Stops the tracing session for the process. Will be no-op if tracing
* is not enabled. This function is not real-time safe.
Expand All @@ -148,18 +135,6 @@ class App {
void StartQuill();

private:
/**
* @brief Register a thread tracer. Should only be called from Thread::RunThread.
*/
void RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept;

/**
* @brief Remove a thread tracer. Should only be called from Thread::~Thread().
*/
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;

void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;

void StopTraceAggregator() noexcept;
};
} // namespace cactus_rt
Expand Down
28 changes: 13 additions & 15 deletions include/cactus_rt/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,25 @@
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "config.h"
#include "quill/Quill.h"
#include "tracing/thread_tracer.h"
#include "tracing/trace_aggregator.h"

namespace cactus_rt {

/// @private
constexpr size_t kDefaultStackSize = 8 * 1024 * 1024; // 8MB default stack space should be plenty

// Necessary forward declaration
class App;

class Thread {
ThreadConfig config_;
std::string name_;
std::vector<size_t> cpu_affinity_;
size_t stack_size_;

quill::Logger* logger_;
std::shared_ptr<tracing::ThreadTracer> tracer_;
std::shared_ptr<tracing::ThreadTracer> tracer_ = nullptr;

std::atomic_bool stop_requested_ = false;

Expand All @@ -41,12 +38,10 @@ class Thread {
*/
static void* RunThread(void* data);

friend class App;

// Non-owning App pointer. Used only for notifying that the thread has
// started/stopped for tracing purposes. Set by Thread::Start and read at
// Non-owning TraceAggregator pointer. Used only for notifying that the thread
// has started/stopped for tracing purposes. Set by Thread::Start and read at
// the beginning of Thread::RunThread.
App* app_ = nullptr;
std::weak_ptr<tracing::TraceAggregator> trace_aggregator_;

public:
/**
Expand All @@ -60,8 +55,7 @@ class Thread {
name_(name),
cpu_affinity_(config_.cpu_affinity),
stack_size_(static_cast<size_t>(PTHREAD_STACK_MIN) + config_.stack_size),
logger_(quill::create_logger(name_)),
tracer_(std::make_shared<tracing::ThreadTracer>(name, config_.tracer_config.queue_size)) {
logger_(quill::create_logger(name_)) {
if (!config.scheduler) {
throw std::runtime_error("ThreadConfig::scheduler cannot be nullptr");
}
Expand Down Expand Up @@ -123,12 +117,16 @@ class Thread {
*
* @private
*/
inline void SetApp(App* app) {
app_ = app;
inline void SetTraceAggregator(std::weak_ptr<tracing::TraceAggregator> trace_aggregator) {
trace_aggregator_ = trace_aggregator;
}

protected:
inline quill::Logger* Logger() const { return logger_; }
inline quill::Logger* Logger() const { return logger_; }

/**
* Gets the current tracer object. Should only ever be called from within the thread itself.
*/
inline tracing::ThreadTracer& Tracer() { return *tracer_; }
inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; }
inline const ThreadConfig& Config() const noexcept { return config_; }
Expand Down
23 changes: 23 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CACTUS_TRACING_THREAD_TRACER_H_
#define CACTUS_TRACING_THREAD_TRACER_H_

#include <atomic>
#ifndef CACTUS_RT_TRACING_ENABLED
#include "thread_tracer.disabled.h"
#else
Expand Down Expand Up @@ -36,6 +37,8 @@ class ThreadTracer {

moodycamel::ReaderWriterQueue<TrackEventInternal> queue_;

std::atomic_bool thread_done_;

// The event name interning must be done per thread (per sequence). Thus it is
// stored here. However, this class must NEVER call functions here (other
// than maybe .Size), as the memory allocation can occur. This variable is
Expand Down Expand Up @@ -84,6 +87,26 @@ class ThreadTracer {
*/
void SetTid() noexcept { tid_ = gettid(); }

/**
* @brief This marks this thread tracer as "done" and thus the trace
* aggregator will try to remove it after flushing the data.
*
* @private
*/
void MarkDone() noexcept {
thread_done_.store(true, std::memory_order_release);
}

/**
* @brief Checks if this thread tracer is done. Should only be called from
* TraceAggregator.
*
* @private
*/
bool IsDone() noexcept {
return thread_done_.load(std::memory_order_acquire);
}

private:
template <typename... Args>
bool Emit(Args&&... args) noexcept;
Expand Down
Loading

0 comments on commit f7ded03

Please sign in to comment.