Skip to content

Commit

Permalink
Added first pass of tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jul 13, 2023
1 parent 5f20692 commit 1720ea1
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 2 deletions.
6 changes: 4 additions & 2 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"includePath": [
"${workspaceFolder}/include",
"${workspaceFolder}/build/debug/_deps/**",
"${workspaceFolder}/build/debug/_deps/readerwriterqueue-src"
"${workspaceFolder}/build/debug/_deps/readerwriterqueue-src",
"${workspaceFolder}/build/debug/protos"
],
"defines": [],
"compilerPath": "/usr/bin/gcc",
Expand All @@ -19,7 +20,8 @@
"includePath": [
"${workspaceFolder}/include",
"${workspaceFolder}/build/release/_deps/**",
"${workspaceFolder}/build/debug/_deps/readerwriterqueue-src"
"${workspaceFolder}/build/release/_deps/readerwriterqueue-src",
"${workspaceFolder}/build/release/protos"
],
"defines": [],
"compilerPath": "/usr/bin/gcc",
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ add_library(cactus_rt
src/cactus_rt/thread.cc
src/cactus_rt/cyclic_thread.cc
src/cactus_rt/signal_handler.cc
src/cactus_rt/tracing/thread_tracer.cc
)

target_include_directories(cactus_rt
Expand All @@ -128,6 +129,10 @@ target_compile_definitions(cactus_rt PUBLIC QUILL_USE_BOUNDED_QUEUE)

if (ENABLE_TRACING)
target_compile_definitions(cactus_rt PUBLIC CACTUS_RT_ENABLE_TRACING=1)
target_link_libraries(cactus_rt
PUBLIC
cactus_tracing_embedded_perfetto_protos
)
endif()

if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME})
Expand Down
65 changes: 65 additions & 0 deletions include/cactus_rt/experimental/lockless/atomic_message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_ATOMIC_MESSAGE_H_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_ATOMIC_MESSAGE_H_

#include <atomic>
#include <utility>

namespace cactus_rt::experimental::lockless {

/**
* @brief A simple wrapper around a std::atomic to provide a real-time safe
* compare-and-swap operation (via .Modify()).
*
* @tparam T the data type to be wrapped. std::atomic<T>::is_always_lock_free must be true.
*/
template <typename T, std::memory_order memory_order = std::memory_order_seq_cst>
class AtomicMessage {
static_assert(std::atomic<T>::is_always_lock_free, "AtomicMessage<T> requires type T to be usable with std::atomic<T>::is_always_lock_free");
std::atomic<T> data_;

public:
/**
* @brief Constructs a new AtomicMessage using the constructor of the
* underlying type.
*/
template <typename... Args>
AtomicMessage(Args&&... args) : data_(std::forward<Args>(args)...) {}

/**
* @brief Returns a copy of the data in a lock-less manner.
*/
T Read() const noexcept {
return data_.load(memory_order);
}

/**
* @brief Writes into the variable in a lock-less manner.
*/
void Write(const T& data) noexcept {
return data_.store(data, memory_order);
}

/**
* @brief Modify the variable via an operator function in a lock-less manner (but not wait-free).
*
* @param f A function that takes the old value and returns the new value.
* Maybe called multiple times. To be real-time safe, this can be a function
* pointer or a lambda function that doesn't capture anything that can cause
* dynamic memory allocation.
*/
template <typename Func>
void Modify(Func f) noexcept {
auto old_value = Read();

while (true) {
T new_value = f(old_value);
if (data_.compare_exchange_weak(old_value, new_value, memory_order)) {
break;
}
}
}
};

} // namespace cactus_rt::experimental::lockless

#endif
29 changes: 29 additions & 0 deletions include/cactus_rt/tracing/sink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef CACTUS_RT_TRACING_SINK_H_
#define CACTUS_RT_TRACING_SINK_H_

#include <fstream>

#include "trace.pb.h"

namespace cactus_rt::tracing {
/**
* An arbitrary sink where trace packets can be written to.
*/
class Sink {
public:
virtual bool Write(const cactus_tracing::vendor::perfetto::protos::Trace& trace) = 0;
};

/**
* A file sink where trace data is written to a file
*/
class FileSink : public Sink {
std::fstream file_;

public:
explicit FileSink(const char* filename);

bool Write(const cactus_tracing::vendor::perfetto::protos::Trace& trace) final;
};
} // namespace cactus_rt::tracing
#endif
85 changes: 85 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#ifndef CACTUS_RT_TRACING_THREAD_TRACER_H_
#define CACTUS_RT_TRACING_THREAD_TRACER_H_

#include <readerwriterqueue.h>

#include <cstdint>

#include "../experimental/lockless/atomic_message.h"
#include "track_event_internal.h"

namespace cactus_rt::tracing {
// Forward declaration to avoid circular dependencies
class Tracer;
class TraceSpan;

struct EventCountData {
uint32_t total_events;
uint32_t dropped_events;
};

/**
* @brief A tracer for a single thread. Creates a single track in the trace
* output. Should only have one per thread.
*/
class ThreadTracer {
friend class Tracer;

const Tracer& tracer_;

const char* thread_name_;
int32_t thread_tid_;
uint64_t thread_track_uuid_;
uint32_t trusted_packet_sequence_id_;

uint32_t queue_capacity_;
moodycamel::ReaderWriterQueue<TrackEventInternal> queue_;

cactus_rt::experimental::lockless::AtomicMessage<EventCountData> event_count_;

public:
ThreadTracer(
const Tracer& tracer,
const char* thread_name,
int32_t thread_tid,
uint64_t thread_track_uuid,
uint32_t trusted_packet_sequence_id,
uint32_t queue_capacity = 16384 // TODO: probably need a better way to specify this
);

bool StartSpan(const char* name, const char* category = nullptr) noexcept;
bool EndSpan() noexcept;
TraceSpan WithSpan(const char* name, const char* category = nullptr) noexcept;
bool InstantEvent(const char* name, const char* category = nullptr) noexcept;

template <typename... Args>
bool Emit(Args&&... args) noexcept;

inline EventCountData EventCount() noexcept {
return event_count_.Read();
}

private:
void IncrementEventCount(bool dropped) noexcept;
};

class TraceSpan {
friend class ThreadTracer;
ThreadTracer* tracer_;

TraceSpan(ThreadTracer* tracer, const char* name, const char* category = nullptr);

public:
~TraceSpan();

TraceSpan(const TraceSpan&) = delete;
TraceSpan& operator=(const TraceSpan&) = delete;

// Move OK, but copy is not?
// TODO: check if this move constructor is necessary. It's likely that it doesn't need it.
TraceSpan(TraceSpan&&) = default;
TraceSpan& operator=(TraceSpan&&) = default;
};

} // namespace cactus_rt::tracing
#endif
138 changes: 138 additions & 0 deletions include/cactus_rt/tracing/tracer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#ifndef CACTUS_RT_TRACING_TRACER_H_
#define CACTUS_RT_TRACING_TRACER_H_

#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

#include "../thread.h"
#include "sink.h"
#include "thread_tracer.h"
#include "trace.pb.h"

// TODO: this feature is not fully fleshed out,
#ifndef CACTUS_RT_TRACING_STARTS_ENABLED
#define CACTUS_RT_TRACING_STARTS_ENABLED true
#endif

namespace cactus_rt::tracing {

/**
* @brief This is the main tracer object where the ThreadTracers are registered.
* Only one of these should be created per process.
*
* This is also the background thread where the trace packets are processed and
* sinked.
*/
class Tracer : public cactus_rt::Thread<> {
const char* process_name_;
int32_t process_pid_;
uint64_t process_track_uuid_;

// Whether or not tracing is enabled. This can be dynamically controlled via
// EnableTracing and DisableTracing.
std::atomic_bool tracing_enabled_ = CACTUS_RT_TRACING_STARTS_ENABLED;

// This mutex is for adding/removing thread tracers (sources) and
// adding/removing sinks. These operations do not interfere the emission of
// of trace events and thus the emission of trace events are not protected by
// this mutex. This is good, because having such a mutex is not
// real-time-safe.
//
// Using this mutex, we can block the sinking of trace events while we change
// the sources and sinks. Every time a source is added, we need to create
// tracks and sink them into the protobuf trace message stream. Every time a
// sink is added, we need to ensure all the track descriptor packets and other
// sticky packets are emitted first before the events. This mutex helps
// block the sinking of normal trace events so these track descriptor packets
// can be written in the right order.
std::mutex mutex_;

// A vector of sinks that we can write the trace packets to.
std::vector<std::unique_ptr<Sink>> sinks_;

// This is a vector of all known thread tracers. The background processing
// thread will loop through this and pop all data from the queues.
// Tracer is a friend class of ThreadTracer and thus can access all private
// variables. These two structs are supposed to be tightly coupled so this is
// no problem.
std::vector<ThreadTracer> thread_tracers_;

// This is a vector of sticky trace packets that should always be emitted
// when a new sink connects to the tracer. When a new sink connects to the
// tracer, these packet will be sent first, before any events are sent.
//
// Packets that should be included here are things like the process/thread
// track descriptor packets, or any other packets that affect the trace
// globally and must be emitted before events are emitted.
//
// The list of packets only grow here (although shouldn't grow that much).
std::vector<cactus_tracing::vendor::perfetto::protos::Trace> sticky_trace_packets_;

public:
/**
* @brief Constructs a new Tracer. Should only be called once per process.
*/
Tracer();

// No copy
Tracer(const Tracer&) = delete;
Tracer& operator=(const Tracer&) = delete;

// No move
Tracer(Tracer&&) noexcept = delete;
Tracer& operator=(Tracer&&) noexcept = delete;

/**
* @brief Creates a new thread tracer. Each thread should only have one of
* these and it should be called during initialization of the thread.
*/
ThreadTracer& CreateThreadTracer();

/**
* @brief Adds a sink. Not real-time safe.
*/
void AddSink(std::unique_ptr<Sink> sink);

/**
* @brief Dynamically enables tracing in a thread-safe manner.
*
* This feature is not fully functional. For example, should enable tracing
* take a filename and reset the sinks so we don't write one giant file?
*/
void EnableTracing() noexcept {
tracing_enabled_.store(true, std::memory_order_relaxed);
}

/**
* @brief Dynamically disables tracing in a thread-safe manner.
*
* This feature is not fully functional. For example, should enable tracing
* take a filename and reset the sinks so we don't write one giant file?
*/
void DisableTracing() noexcept {
tracing_enabled_.store(false, std::memory_order_relaxed);
}

/**
* @brief Checks if tracing is enabled. Thread safe and safe to call from RT.
*
* You don't usually need to call this manually, as the methods that emits
* trace events on ThreadTracers will call this method internally.
*
* @returns true if tracing is enabled, false otherwise.
*/
inline bool IsTracingEnabled() noexcept {
// TODO: give likely hint if CACTUS_RT_TRACING_STARTS_ENABLED is true...
return tracing_enabled_.load(std::memory_order_relaxed);
}

protected:
void Run() final;
};
} // namespace cactus_rt::tracing

#endif
Loading

0 comments on commit 1720ea1

Please sign in to comment.