Skip to content

Commit

Permalink
Stream interface
Browse files Browse the repository at this point in the history
  • Loading branch information
StoneLin0708 committed Jun 28, 2024
1 parent a6b0f44 commit 3aeaf84
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 2,445 deletions.
11 changes: 1 addition & 10 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set(SOURCE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/Source)
file(GLOB_RECURSE SRC_FILES LIST_DIRECTORIES false "${SOURCE_PATH}/*.cpp" "${SOURCE_PATH}/*.h")
set(GUI_COMMONLIB_DIR ${GUI_BASE_DIR}/installed_libs)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD 23)

set(CONFIGURATION_FOLDER $<$<CONFIG:Debug>:Debug>$<$<NOT:$<CONFIG:Debug>>:Release>)

Expand Down Expand Up @@ -95,10 +95,6 @@ elseif(APPLE)
set_property(TARGET ${PLUGIN_NAME} APPEND_STRING PROPERTY LINK_FLAGS
"-undefined dynamic_lookup -rpath @loader_path/../../../../shared-api8")

add_custom_command(TARGET ${PLUGIN_NAME} POST_BUILD COMMAND
install_name_tool -change "@loader_path/../Frameworks/libokFrontPanel.dylib"
"@rpath/libokFrontPanel.dylib" $<TARGET_FILE:${PLUGIN_NAME}>)

install(TARGETS ${PLUGIN_NAME} DESTINATION $ENV{HOME}/Library/Application\ Support/open-ephys/plugins-api8)
endif()

Expand All @@ -112,23 +108,18 @@ foreach( src_file IN ITEMS ${SRC_FILES})
endforeach()

# additional libraries
find_package(okFrontPanel REQUIRED)
find_package(xdaq REQUIRED)
find_package(fmt REQUIRED)
find_package(nlohmann_json REQUIRED)

target_link_libraries(${PLUGIN_NAME}
fmt::fmt
okFrontPanel::okFrontPanel
xdaq::xdaq
nlohmann_json::nlohmann_json
)
if (MSVC)
install(FILES $<TARGET_FILE:okFrontPanel::okFrontPanel> DESTINATION ${GUI_BIN_DIR}/shared CONFIGURATIONS ${CMAKE_CONFIGURATION_TYPES})
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/Resources/xdaq.bit DESTINATION ${GUI_BIN_DIR}/shared)
elseif(LINUX)
install(FILES $<TARGET_FILE:okFrontPanel::okFrontPanel> DESTINATION ${GUI_BIN_DIR}/shared)
elseif(APPLE)
install(FILES $<TARGET_FILE:okFrontPanel::okFrontPanel> DESTINATION $ENV{HOME}/Library/Application\ Support/open-ephys/shared-api8)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/Resources/xdaq.bit DESTINATION ${GUI_BASE_DIR}/Build/open-ephys.app/Contents/Resources/shared)
endif()
207 changes: 95 additions & 112 deletions Source/DeviceThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,29 @@
*/

#include "rhythm-api/intan_chip.h"
#include "rhythm-api/rhd2000datablock.h"
#ifdef _WIN32
#define NOMINMAX
#endif


#include "DeviceThread.h"

#include <fmt/format.h>

#include <algorithm>
#include <cassert>
#include <chrono>
#include <ranges>

#include "DeviceEditor.h"
#include "DeviceThread.h"
#include "Headstage.h"
#include "ImpedanceMeter.h"
#include "rhythm-api/utils.h"

using namespace RhythmNode;

#if defined(_WIN32)
#define okLIB_NAME "okFrontPanel.dll"
#define okLIB_EXTENSION "*.dll"
#elif defined(__APPLE__)
#define okLIB_NAME "libokFrontPanel.dylib"
#define okLIB_EXTENSION "*.dylib"
#elif defined(__linux__)
#define okLIB_NAME "./libokFrontPanel.so"
#define okLIB_EXTENSION "*.so"
#endif

using namespace RhythmNode;

// #define DEBUG_EMULATE_HEADSTAGES 8
// #define DEBUG_EMULATE_64CH
Expand Down Expand Up @@ -86,9 +79,7 @@ DeviceThread::DeviceThread(SourceNode *sn)
const String executableDirectory = executable.getParentDirectory().getFullPathName();
#endif

libraryFilePath = executableDirectory + File::getSeparatorString() + okLIB_NAME;

if (openBoard(libraryFilePath)) {
if (openBoard("")) {
// upload bitfile and restore default settings
initializeBoard();

Expand Down Expand Up @@ -219,34 +210,13 @@ Array<int> DeviceThread::getDACchannels() const

bool DeviceThread::openBoard(String pathToLibrary)
{
int return_code = evalBoard->open(pathToLibrary.getCharPointer());
int return_code = evalBoard->open("");

if (return_code == 1) {
deviceFound = true;
} else if (return_code == -1) // dynamic library not found
{
bool response = AlertWindow::showOkCancelBox(
AlertWindow::NoIcon, "Opal Kelly library not found.",
"The Opal Kelly library file was not found in the directory of the executable. "
"Would you like to browse for it?",
"Yes", "No", 0, 0);
if (response) {
// browse for file
FileChooser fc("Select the library file...", File::getCurrentWorkingDirectory(),
okLIB_EXTENSION, true);

if (fc.browseForFileToOpen()) {
File currentFile = fc.getResult();
libraryFilePath = currentFile.getFullPathName();
openBoard(libraryFilePath); // call recursively
} else {
// sendActionMessage("No configuration selected.");
deviceFound = false;
}

} else {
deviceFound = false;
}
deviceFound = false;
} else if (return_code == -2) // board could not be opened
{
bool response = AlertWindow::showOkCancelBox(
Expand All @@ -255,7 +225,7 @@ bool DeviceThread::openBoard(String pathToLibrary)
0);

if (response) {
openBoard(libraryFilePath.getCharPointer()); // call recursively
openBoard(""); // call recursively
} else {
deviceFound = false;
}
Expand Down Expand Up @@ -1014,77 +984,89 @@ bool DeviceThread::startAcquisition()
evalBoard->flush();
evalBoard->setContinuousRunMode(true);
evalBoard->run();
running = true;
data_thread = std::thread([this]() {
const int numStreams = evalBoard->getNumEnabledDataStreams();
std::vector<bool> isddrstream;

const int chunk_size = 1;
Rhd2000DataBlock current_block(numStreams, chunk_size, evalBoard->get_dio32());
int nonddr = 0;
for (int i = 0; i < evalBoard->ports.max_streams; ++i) {
if (!evalBoard->isStreamEnabled(i)) continue;
bool ddr = evalBoard->ports.is_ddr(i);
isddrstream.push_back(ddr);
nonddr += !ddr;
}
isddrstream.push_back(false);
const int current_aquisition_streams = evalBoard->getNumEnabledDataStreams();
const int current_aquisition_channels =
(32 * numStreams + nonddr * 3 * settings.acquireAux +
+settings.acquireAdc * evalBoard->ports.num_of_adc);
const int buffer_samples = 256;
std::vector<unsigned char> data_buffer(evalBoard->get_sample_size<char>() * buffer_samples);
std::vector<float> output_buffer(current_aquisition_channels * chunk_size);

while (running) {
for(auto begin = data_buffer.begin(); begin != data_buffer.end();){
auto r = evalBoard->read_raw_to_buffer(data_buffer.end() - begin, &*begin);
begin +=r;
if(r == 0) std::this_thread::sleep_for(std::chrono::milliseconds(1));
if(!running) return;
}
const auto chunk_buffer_size = evalBoard->get_sample_size<char>() * chunk_size;

for (int chunk = 0; chunk < data_buffer.size() / chunk_buffer_size; ++chunk) {
current_block.from_buffer(&data_buffer[chunk_buffer_size * chunk]);

// tranpose data from Time x Channel x Stream to Time x Stream x Channel
auto target = output_buffer.begin();
for (int s = 0; s < current_aquisition_streams; ++s) {
target =
std::copy(current_block.amp.begin() + s * 32 * chunk_size,
current_block.amp.begin() + (s + 1) * 32 * chunk_size, target);
if (!settings.acquireAux | (!isddrstream[s] && isddrstream[s + 1])) continue;

for (int c = 0; c < 3; ++c) {
auto &current_aux_buffer = auxBuffer[s * 3 + c];
for (int t = 0; t < chunk_size; ++t) {
// aux is offset by 1
const int aux = (current_block.timeStamp[t] + 3) % 4;
// update aux buffer with new value that sampled every 4th sample
if (aux == c)
current_aux_buffer =
IntanChip::aux2V(current_block.aux[1][s * chunk_size+ t]);
// oversampleing by 4 times
*target++ = current_aux_buffer;
evalBoard->set_dio32(true);

int buffer_samples;
std::size_t buffered = 0;
std::vector<bool> isddrstream;
int nonddr = 0;
for (int i = 0; i < evalBoard->ports.max_streams; ++i) {
if (!evalBoard->isStreamEnabled(i)) continue;
bool ddr = evalBoard->ports.is_ddr(i);
isddrstream.push_back(ddr);
nonddr += !ddr;
}
isddrstream.push_back(false);

const int current_aquisition_channels =
(32 * evalBoard->getNumEnabledDataStreams() + nonddr * 3 * settings.acquireAux +
+settings.acquireAdc * evalBoard->ports.num_of_adc);

stream =
evalBoard->dev
->start_read_stream(
0xa0,
[this, chunk_size = 1, buffered = 0,
streams = evalBoard->getNumEnabledDataStreams(),
sample_size = evalBoard->get_sample_size<char>(),
buffer = std::vector<unsigned char>(evalBoard->get_sample_size<char>()),
acquireAdc = settings.acquireAdc, acquireAux = settings.acquireAux,
output_buffer = std::vector<float>(current_aquisition_channels * 1),
isddrstream = isddrstream, aux_buffer = std::array<float, 32 * 3>(),
sourceBuffers = &sourceBuffers](auto raw_data, std::size_t length) mutable {
std::span<unsigned char> data(raw_data.get(), length);
const int number_of_samples = (data.size() + buffered) / sample_size;
using namespace utils::endian;
auto convert_one_sample = [&](std::ranges::viewable_range auto &&raw) {
if (little2host64(raw.data()) != RHD2000_HEADER_MAGIC_NUMBER) {
fmt::print("Failed to parse data\n");
throw std::runtime_error("Failed to parse data");
}
std::int64_t ts = little2host32(raw.data() + 8);
auto target = output_buffer.begin();
const auto amp = raw.begin() + 12;
for (int s = 0; s < streams; ++s) {
for (int c = 3; c < 35; ++c) {
*(target++) =
IntanChip::amp2uV(little2host16(&*amp + (s + c * streams) * 2));
}
if (!acquireAux | (!isddrstream[s] && isddrstream[s + 1])) continue;
for (int c = 0; c < 3; ++c) {
if (((ts + 3) % 4) == c)
aux_buffer[s * 3 + c] = IntanChip::aux2V(
little2host16(&*amp + (s + c * streams) * 2));
*(target++) = aux_buffer[s * 3 + c];
}
}
const auto io = raw.end() - 8 * 2 - 4 - 4;
if (acquireAdc) {
for (int c = 0; c < 8; ++c) {
*(target++) = IntanChip::adc2V(little2host16(&*io + 2 * c));
}
}
}
}

if (settings.acquireAdc) {
std::copy(current_block.adc.begin(), current_block.adc.end(), target);
}

auto ts = std::vector<double>(chunk_size, 0);
auto ttl = std::vector<uint64_t>(chunk_size);
std::transform(current_block.ttlIn.begin(), current_block.ttlIn.end(), ttl.begin(),
[](auto t) { return t; });
sourceBuffers[0]->addToBuffer(&output_buffer[0], &current_block.timeStamp[0],
&ts[0], &ttl[0], chunk_size, chunk_size);
}
}
});
double _ts;
uint64_t ttl = little2host32(&*io + 16);
(*sourceBuffers)[0]->addToBuffer(&output_buffer[0], &ts, &_ts, &ttl,
chunk_size, chunk_size);
};

if (buffered > 0) {
const int remaining = sample_size - buffered;
std::copy(data.begin(), data.begin() + remaining,
buffer.begin() + buffered);
buffered = 0;
data = data.subspan(remaining);
convert_one_sample(buffer);
}
std::ranges::for_each(data.subspan(0, data.size() / sample_size * sample_size) |
std::ranges::views::chunk(sample_size),
convert_one_sample);
auto leftover = data.size() % sample_size;
std::copy(data.end() - leftover, data.end(), buffer.begin());
buffered = leftover;
})
.value(),

startThread();

Expand All @@ -1096,9 +1078,9 @@ bool DeviceThread::startAcquisition()
bool DeviceThread::stopAcquisition()
{
// LOGD("RHD2000 data thread stopping acquisition.");
running = false;
if (data_thread.joinable())
data_thread.join();
if (stream) {
stream.value()->stop();
}

if (isThreadRunning()) {
signalThreadShouldExit();
Expand Down Expand Up @@ -1134,6 +1116,7 @@ bool DeviceThread::stopAcquisition()

bool DeviceThread::updateBuffer()
{
return true;
if (updateSettingsDuringAcquisition) {
LOGD("DAC");
for (int k = 0; k < 8; k++) {
Expand Down
12 changes: 5 additions & 7 deletions Source/DeviceThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#pragma once
#include <DataThreadHeaders.h>
#include <xdaq/device_plugin.h>

#include <array>
#include <atomic>
Expand All @@ -33,6 +34,7 @@
#include "rhythm-api/rhd2000evalboard.h"
#include "rhythm-api/rhd2000registers.h"


#define CHIP_ID_RHD2132 1
#define CHIP_ID_RHD2216 2
#define CHIP_ID_RHD2164 4
Expand Down Expand Up @@ -195,6 +197,9 @@ class DeviceThread : public DataThread
bool set_dio32(bool enable) { return evalBoard->set_dio32(enable); }

private:
std::optional<std::unique_ptr<xdaq::DevicePlugin::PluginOwnedDevice::element_type::DataStream>>
stream;

std::queue<DigitalOutputCommand> digitalOutputCommands;

OwnedArray<DigitalOutputTimer> digitalOutputTimers;
Expand Down Expand Up @@ -273,9 +278,6 @@ class DeviceThread : public DataThread

} settings;

/** Path to Opal Kelly library file*/
String libraryFilePath;

/** Open the connection to the acquisition board*/
bool openBoard(String pathToLibrary);

Expand Down Expand Up @@ -308,10 +310,6 @@ class DeviceThread : public DataThread

StringArray channelNames;

std::thread data_thread;
std::atomic_bool running;


JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(DeviceThread);
};

Expand Down
Loading

0 comments on commit 3aeaf84

Please sign in to comment.