Skip to content

Commit

Permalink
WIP lockless example
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Apr 22, 2024
1 parent efb41a5 commit 3c6be84
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME})

if (ENABLE_EXAMPLES)
message(STATUS "Building example programs. Turn it off via ENABLE_EXAMPLES=OFF")
add_subdirectory(examples/message_passing_example)
add_subdirectory(examples/lockless_example)
add_subdirectory(examples/logging_example)
add_subdirectory(examples/message_passing_example)
add_subdirectory(examples/mutex_example)
add_subdirectory(examples/signal_handling_example)
add_subdirectory(examples/simple_deadline_example)
Expand Down
10 changes: 10 additions & 0 deletions examples/lockless_example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
add_executable(rt_lockless_example
main.cc
)

target_link_libraries(rt_lockless_example
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_lockless_example)
145 changes: 145 additions & 0 deletions examples/lockless_example/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include <cactus_rt/experimental/lockless.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <iostream>

using cactus_rt::App;
using cactus_rt::CyclicThread;
using cactus_rt::Thread;
using cactus_rt::experimental::lockless::AtomicMessage;
using cactus_rt::experimental::lockless::spsc::RealtimeReadableValue;
using namespace std::chrono_literals;

struct Pose {
// We want default constructed values to have a flag showing it is default
// constructed. This is because the RealtimeReadableValue will default
// construct a value and it can immediately be read. We need to tell the writer
// it is invalid. It may not be necessary to do this in general.
bool valid = false;
double x = 0.0;
double y = 0.0;
double z = 0.0;
double roll = 0.0;
double pitch = 0.0;
double yaw = 0.0;

Pose() {}

Pose(double xx, double yy, double zz, double ro, double pi, double ya) : valid(true),
x(xx),
y(yy),
z(zz),
roll(ro),
pitch(pi),
yaw(ya) {}
};

bool operator==(const Pose& p1, const Pose& p2) {
return p1.x == p2.x &&
p1.y == p2.y &&
p1.z == p2.z &&
p1.roll == p2.roll &&
p1.pitch == p2.pitch &&
p1.yaw == p2.yaw;
}

bool operator!=(const Pose& p1, const Pose& p2) {
return !(p1 == p2);
}

/**
* A struct that holds all the shared data so it can be passed to both the real-time and non-real-time threads
*/
struct Context {
AtomicMessage<bool> done = false;
RealtimeReadableValue<Pose> target_pose = {};
};

/**
* This is a real-time thread
*/
class RTThread : public CyclicThread {
Context& ctx_;
Pose current_target_pose_ = {};

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
RTThread(Context& ctx) : CyclicThread("RTThread", CreateThreadConfig()), ctx_(ctx) {}

protected:
bool Loop(int64_t /*now*/) noexcept final {
if (ctx_.done.Read()) {
return true;
}

Pose new_pose = ctx_.target_pose.Read();
if (!new_pose.valid) {
return false;
}

if (new_pose != current_target_pose_) {
current_target_pose_ = new_pose;
LOG_INFO(
Logger(),
"detected new pose: {} {} {} {} {} {}",
current_target_pose_.x,
current_target_pose_.y,
current_target_pose_.z,
current_target_pose_.roll,
current_target_pose_.pitch,
current_target_pose_.yaw
);
}

return false;
}
};

class NonRTThread : public Thread {
Context& ctx_;

static cactus_rt::ThreadConfig CreateThreadConfig() {
cactus_rt::ThreadConfig thread_config;
thread_config.SetOtherScheduler();
return thread_config;
}

public:
NonRTThread(Context& ctx) : Thread("NonRTThread", CreateThreadConfig()), ctx_(ctx) {}

void Run() final {
ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 6.5));
std::this_thread::sleep_for(1s);

// Realistically only one of these values should be visible on the real-time thread.
ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 7.5));
ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 8.5));
std::this_thread::sleep_for(1s);

ctx_.done.Write(true);
}
};

int main() {
Context ctx;
auto rt_thread = std::make_shared<RTThread>(ctx);
auto non_rt_thread = std::make_shared<NonRTThread>(ctx);

App app;
app.RegisterThread(rt_thread);
app.RegisterThread(non_rt_thread);

app.Start();
app.Join();

return 0;
}
8 changes: 8 additions & 0 deletions include/cactus_rt/experimental/lockless.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_H_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_H_

#include "lockless/atomic_message.h"
#include "lockless/spsc/realtime_readable_value.h"
#include "lockless/spsc/realtime_writable_value.h"

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_READABLE_VALUE_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_READABLE_VALUE_

#include <atomic>
#include <memory>

namespace cactus_rt::experimental::lockless::spsc {

/**
* This uses the CAS exchange algorithm to allow a single (real-time) thread to
* be able to atomically read a value shared by a different (non-real-time)
* thread.
*
* The reader for this algorithm is wait-free while the writer is lock-free. The
* reader is unable to modify the value and transmit it back to the writer with
* this algorithm.
*/
template <typename T>
class RealtimeReadableValue {
std::unique_ptr<T> storage_ptr_ = std::make_unique<T>();
std::atomic<T*> atomic_ptr_ = storage_ptr_.get();

public:
T Read() {
// TODO: need to figure out the atomic memory order here!
T* data_ptr = atomic_ptr_.exchange(nullptr);
T data = *data_ptr;
atomic_ptr_.store(data_ptr);
return data;
}

void Write(const T& new_value) {
auto new_ptr = std::make_unique<T>(new_value);
T* expected;

do {
expected = storage_ptr_.get();
// TODO: sequential consistency is probably wrong here. Need to understand if acq_rel is sufficient.
} while (!atomic_ptr_.compare_exchange_weak(expected, new_ptr.get()));

storage_ptr_ = std::move(new_ptr);
}
};

} // namespace cactus_rt::experimental::lockless::spsc

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_WRITABLE_VALUE_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_WRITABLE_VALUE_

namespace cactus_rt::experimental::lockless::spsc {

}

#endif

0 comments on commit 3c6be84

Please sign in to comment.